You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/14 23:49:28 UTC

[1/7] flink git commit: [FLINK-9530][metrics] Fix numRecords task metric for chains

Repository: flink
Updated Branches:
  refs/heads/master 7e51b90d9 -> 009e9fe4e


[FLINK-9530][metrics] Fix numRecords task metric for chains

This closes #6126.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/03d77b86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/03d77b86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/03d77b86

Branch: refs/heads/master
Commit: 03d77b862b845fc2b2836fc35fdbc8793d5064b7
Parents: 7e51b90
Author: zentol <ch...@apache.org>
Authored: Tue Jun 5 15:01:10 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 14 19:33:36 2018 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/OperatorChain.java  |  2 -
 .../runtime/tasks/OneInputStreamTaskTest.java   | 51 +++++++++++++++
 .../runtime/tasks/TwoInputStreamTaskTest.java   | 67 ++++++++++++++++++++
 3 files changed, 118 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/03d77b86/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 8d1cbde..c105ad7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -442,8 +442,6 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 				Counter tmpNumRecordsIn;
 				try {
 					OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup();
-					ioMetricGroup.reuseInputMetricsForTask();
-					ioMetricGroup.reuseOutputMetricsForTask();
 					tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
 				} catch (Exception e) {
 					LOG.warn("An exception occurred during the metrics setup.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/03d77b86/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 81fb447..af776d5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -37,6 +38,7 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
@@ -612,6 +614,55 @@ public class OneInputStreamTaskTest extends TestLogger {
 	}
 
 	@Test
+	public void testOperatorMetricReuse() throws Exception {
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setupOperatorChain(new OperatorID(), new DuplicatingOperator())
+			.chain(new OperatorID(), new DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+			.chain(new OperatorID(), new DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+			.finish();
+
+		final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
+			@Override
+			public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+				return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
+			}
+		};
+
+		final StreamMockEnvironment env = new StreamMockEnvironment(
+			testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) {
+			@Override
+			public TaskMetricGroup getMetricGroup() {
+				return taskMetricGroup;
+			}
+		};
+
+		final Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+		testHarness.invoke(env);
+		testHarness.waitForTaskRunning();
+
+		final int numRecords = 5;
+
+		for (int x = 0; x < numRecords; x++) {
+			testHarness.processElement(new StreamRecord<>("hello"));
+		}
+		testHarness.waitForInputProcessing();
+
+		assertEquals(numRecords, numRecordsInCounter.getCount());
+		assertEquals(numRecords * 2 * 2 * 2, numRecordsOutCounter.getCount());
+	}
+
+	static class DuplicatingOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
+		@Override
+		public void processElement(StreamRecord<String> element) {
+			output.collect(element);
+			output.collect(element);
+		}
+	}
+
+	@Test
 	public void testWatermarkMetrics() throws Exception {
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/03d77b86/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 58e28b3..38b262c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -21,12 +21,14 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
@@ -36,6 +38,8 @@ import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -48,6 +52,8 @@ import org.junit.Test;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Tests for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests
  * implicitly also test the {@link org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor}.
@@ -383,6 +389,67 @@ public class TwoInputStreamTaskTest {
 	}
 
 	@Test
+	public void testOperatorMetricReuse() throws Exception {
+		final TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setupOperatorChain(new OperatorID(), new DuplicatingOperator())
+			.chain(new OperatorID(), new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+			.chain(new OperatorID(), new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+			.finish();
+
+		final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
+			@Override
+			public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+				return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
+			}
+		};
+
+		final StreamMockEnvironment env = new StreamMockEnvironment(
+			testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) {
+			@Override
+			public TaskMetricGroup getMetricGroup() {
+				return taskMetricGroup;
+			}
+		};
+
+		final Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
+		final Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+
+		testHarness.invoke(env);
+		testHarness.waitForTaskRunning();
+
+		final int numRecords1 = 5;
+		final int numRecords2 = 3;
+
+		for (int x = 0; x < numRecords1; x++) {
+			testHarness.processElement(new StreamRecord<>("hello"), 0, 0);
+		}
+
+		for (int x = 0; x < numRecords2; x++) {
+			testHarness.processElement(new StreamRecord<>("hello"), 1, 0);
+		}
+		testHarness.waitForInputProcessing();
+
+		assertEquals(numRecords1 + numRecords2, numRecordsInCounter.getCount());
+		assertEquals((numRecords1 + numRecords2) * 2 * 2 * 2, numRecordsOutCounter.getCount());
+	}
+
+	static class DuplicatingOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String> {
+
+		@Override
+		public void processElement1(StreamRecord<String> element) {
+			output.collect(element);
+			output.collect(element);
+		}
+
+		@Override
+		public void processElement2(StreamRecord<String> element) {
+			output.collect(element);
+			output.collect(element);
+		}
+	}
+
+	@Test
 	public void testWatermarkMetrics() throws Exception {
 		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 


[7/7] flink git commit: [hotfix] Remove duplicate lines

Posted by ch...@apache.org.
[hotfix] Remove duplicate lines


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/009e9fe4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/009e9fe4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/009e9fe4

Branch: refs/heads/master
Commit: 009e9fe4e9a24ccad949d75d745f9a1d8cf3ae98
Parents: 0dc0e36
Author: zentol <ch...@apache.org>
Authored: Thu Jun 14 23:15:48 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 14 23:15:48 2018 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/orc/OrcRowInputFormat.java       | 1 -
 flink-core/src/main/java/org/apache/flink/types/StringValue.java    | 1 -
 2 files changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/009e9fe4/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
index 08c1164..ae28194 100644
--- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
+++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
@@ -304,7 +304,6 @@ public class OrcRowInputFormat extends FileInputFormat<Row> implements ResultTyp
 	@Override
 	public void closeInputFormat() throws IOException {
 		this.rows = null;
-		this.rows = null;
 		this.schema = null;
 		this.rowBatch = null;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/009e9fe4/flink-core/src/main/java/org/apache/flink/types/StringValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/StringValue.java b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
index 382bc17..feeec2a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/StringValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
@@ -192,7 +192,6 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
 		for (int i = 0; i < len; i++) {
 			this.value[i] = value.charAt(offset + i);
 		}
-		this.len = len;
 		this.hashCode = 0;
 	}
 	


[5/7] flink git commit: [FLINK-9590][metrics] Make HistogramDump immutable

Posted by ch...@apache.org.
[FLINK-9590][metrics] Make HistogramDump immutable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24266fd6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24266fd6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24266fd6

Branch: refs/heads/master
Commit: 24266fd63b72c76e77ed86e46392f7427e4d1f8b
Parents: 2fc6499
Author: zentol <ch...@apache.org>
Authored: Thu Jun 14 19:10:53 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 14 19:47:06 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/metrics/dump/MetricDump.java  | 22 ++++++++++----------
 1 file changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24266fd6/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
index c2d1eea..e0d6bbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
@@ -94,17 +94,17 @@ public abstract class MetricDump {
 	 * Container for the values of a {@link org.apache.flink.metrics.Histogram}.
 	 */
 	public static class HistogramDump extends MetricDump {
-		public long min;
-		public long max;
-		public double mean;
-		public double median;
-		public double stddev;
-		public double p75;
-		public double p90;
-		public double p95;
-		public double p98;
-		public double p99;
-		public double p999;
+		public final long min;
+		public final long max;
+		public final double mean;
+		public final double median;
+		public final double stddev;
+		public final double p75;
+		public final double p90;
+		public final double p95;
+		public final double p98;
+		public final double p99;
+		public final double p999;
 
 		public HistogramDump(QueryScopeInfo scopeInfo, String name,
 			long min, long max, double mean, double median, double stddev,


[6/7] flink git commit: [FLINK-9591][py] Remove remnants of distributed-cache logic

Posted by ch...@apache.org.
[FLINK-9591][py] Remove remnants of distributed-cache logic


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0dc0e36f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0dc0e36f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0dc0e36f

Branch: refs/heads/master
Commit: 0dc0e36f204401f14e078d24348421cd4140577d
Parents: 24266fd
Author: zentol <ch...@apache.org>
Authored: Thu Jun 14 19:58:32 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 14 22:35:13 2018 +0200

----------------------------------------------------------------------
 docs/dev/batch/python.md                                |  8 ++------
 .../java/org/apache/flink/python/api/PythonOptions.java | 10 ----------
 .../org/apache/flink/python/api/PythonPlanBinder.java   | 12 +-----------
 .../apache/flink/python/api/flink/plan/Environment.py   |  3 ---
 4 files changed, 3 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0dc0e36f/docs/dev/batch/python.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/python.md b/docs/dev/batch/python.md
index 2211102..486aa18 100644
--- a/docs/dev/batch/python.md
+++ b/docs/dev/batch/python.md
@@ -136,10 +136,7 @@ The first two do as the name suggests.
 Please refer to [Data Sinks](#data-sinks) for more information on writing to files.
 
 Once you specified the complete program you need to call `execute` on
-the `Environment`. This will either execute on your local machine or submit your program
-for execution on a cluster, depending on how Flink was started. You can force
-a local execution by using `execute(local=True)`.
-
+the `Environment`. This will submit your program for execution on a cluster.
 {% top %}
 
 Project setup
@@ -159,8 +156,7 @@ Lazy Evaluation
 All Flink programs are executed lazily: When the program's main method is executed, the data loading
 and transformations do not happen directly. Rather, each operation is created and added to the
 program's plan. The operations are actually executed when one of the `execute()` methods is invoked
-on the Environment object. Whether the program is executed locally or on a cluster depends
-on the environment of the program.
+on the Environment object.
 
 The lazy evaluation lets you construct sophisticated programs that Flink executes as one
 holistically planned unit.

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc0e36f/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
index 4137c11..f89a0fe 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
@@ -20,8 +20,6 @@ package org.apache.flink.python.api;
 
 import org.apache.flink.configuration.ConfigOption;
 
-import java.io.File;
-
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
@@ -62,14 +60,6 @@ public class PythonOptions {
 		key("python.mmap.tmp.dir")
 			.noDefaultValue();
 
-	/**
-	 * The config parameter defining where the flink python library and user supplied files will be uploaded to before
-	 * registering them with the Distributed Cache. This directory must be accessible from all worker nodes.
-	 */
-	public static final ConfigOption<String> DC_TMP_DIR =
-		key("python.dc.tmp.dir")
-			.defaultValue(System.getProperty("java.io.tmpdir") + File.separator + "flink_dc");
-
 	private PythonOptions() {
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc0e36f/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 1df15a5..1182708 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -80,7 +80,6 @@ public class PythonPlanBinder {
 	private final Configuration operatorConfig;
 
 	private final String tmpPlanFilesDir;
-	private Path tmpDistributedDir;
 
 	private final SetCache sets = new SetCache();
 	private int currentEnvironmentID = 0;
@@ -109,8 +108,6 @@ public class PythonPlanBinder {
 			? configuredPlanTmpPath
 			: System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + UUID.randomUUID();
 
-		tmpDistributedDir = new Path(globalConfig.getString(PythonOptions.DC_TMP_DIR));
-
 		operatorConfig = new Configuration();
 		operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH));
 		String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR);
@@ -264,25 +261,18 @@ public class PythonPlanBinder {
 	 */
 	private enum Parameters {
 		DOP,
-		MODE,
 		RETRY,
 		ID
 	}
 
 	private void receiveParameters(ExecutionEnvironment env) throws IOException {
-		for (int x = 0; x < 4; x++) {
+		for (int x = 0; x < Parameters.values().length; x++) {
 			Tuple value = (Tuple) streamer.getRecord(true);
 			switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) {
 				case DOP:
 					Integer dop = value.<Integer>getField(1);
 					env.setParallelism(dop);
 					break;
-				case MODE:
-					if (value.<Boolean>getField(1)) {
-						LOG.info("Local execution specified, using default for {}.", PythonOptions.DC_TMP_DIR);
-						tmpDistributedDir = new Path(PythonOptions.DC_TMP_DIR.defaultValue());
-					}
-					break;
 				case RETRY:
 					int retry = value.<Integer>getField(1);
 					env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retry, 10000L));

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc0e36f/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 797ae96..2cc6ecc 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -89,7 +89,6 @@ class Environment(object):
 
         #parameters
         self._dop = -1
-        self._local_mode = False
         self._retry = 0
 
         self._container = container
@@ -212,7 +211,6 @@ class Environment(object):
 
         The environment will execute all parts of the program that have resulted in a "sink" operation.
         """
-        self._local_mode = local
         self._optimize_plan()
 
         if self._container.is_planning():
@@ -326,7 +324,6 @@ class Environment(object):
     def _send_parameters(self):
         collect = self._collector.collect
         collect(("dop", self._dop))
-        collect(("mode", self._local_mode))
         collect(("retry", self._retry))
         collect(("id", self._env_id))
 


[4/7] flink git commit: [FLINK-9589][py] Make PythonOperationInfo immutable

Posted by ch...@apache.org.
[FLINK-9589][py] Make PythonOperationInfo immutable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fc64994
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fc64994
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fc64994

Branch: refs/heads/master
Commit: 2fc6499496296cf2aa2408e4e15d684496a169a2
Parents: eaff4da
Author: zentol <ch...@apache.org>
Authored: Thu Jun 14 18:02:49 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 14 19:46:50 2018 +0200

----------------------------------------------------------------------
 .../flink/python/api/PythonOperationInfo.java   | 66 +++++++++++---------
 .../flink/python/api/PythonPlanBinder.java      | 23 ++++---
 2 files changed, 48 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2fc64994/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 42eeffb..6d7f402 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -20,7 +20,10 @@ import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
 
@@ -28,29 +31,29 @@ import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
  * Generic container for all information required to an operation to the DataSet API.
  */
 public class PythonOperationInfo {
-	public String identifier;
-	public int parentID; //DataSet that an operation is applied on
-	public int otherID; //secondary DataSet
-	public int setID; //ID for new DataSet
-	public String[] keys;
-	public String[] keys1; //join/cogroup keys
-	public String[] keys2; //join/cogroup keys
-	public TypeInformation<?> types; //typeinformation about output type
-	public Object[] values;
-	public int count;
-	public String field;
-	public Order order;
-	public String path;
-	public String fieldDelimiter;
-	public String lineDelimiter;
-	public long frm;
-	public long to;
-	public WriteMode writeMode;
-	public boolean toError;
-	public String name;
-	public boolean usesUDF;
-	public int parallelism;
-	public int envID;
+	public final String identifier;
+	public final int parentID; //DataSet that an operation is applied on
+	public final int otherID; //secondary DataSet
+	public final int setID; //ID for new DataSet
+	public final List<String> keys;
+	public final List<String> keys1; //join/cogroup keys
+	public final List<String> keys2; //join/cogroup keys
+	public final TypeInformation<?> types; //typeinformation about output type
+	public final List<Object> values;
+	public final int count;
+	public final String field;
+	public final Order order;
+	public final String path;
+	public final String fieldDelimiter;
+	public final String lineDelimiter;
+	public final long frm;
+	public final long to;
+	public final WriteMode writeMode;
+	public final boolean toError;
+	public final String name;
+	public final boolean usesUDF;
+	public final int parallelism;
+	public final int envID;
 
 	public PythonOperationInfo(PythonPlanStreamer streamer, int environmentID) throws IOException {
 		identifier = (String) streamer.getRecord();
@@ -75,9 +78,9 @@ public class PythonOperationInfo {
 				order = Order.NONE;
 				break;
 		}
-		keys = normalizeKeys(streamer.getRecord(true));
-		keys1 = normalizeKeys(streamer.getRecord(true));
-		keys2 = normalizeKeys(streamer.getRecord(true));
+		keys = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
+		keys1 = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
+		keys2 = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
 		Object tmpType = streamer.getRecord();
 		types = tmpType == null ? null : getForObject(tmpType);
 		usesUDF = (Boolean) streamer.getRecord();
@@ -94,10 +97,11 @@ public class PythonOperationInfo {
 		toError = (Boolean) streamer.getRecord();
 		count = (Integer) streamer.getRecord(true);
 		int valueCount = (Integer) streamer.getRecord(true);
-		values = new Object[valueCount];
+		List<Object> valueList = new ArrayList<>(valueCount);
 		for (int x = 0; x < valueCount; x++) {
-			values[x] = streamer.getRecord();
+			valueList.add(streamer.getRecord());
 		}
+		values = valueList;
 		parallelism = (Integer) streamer.getRecord(true);
 
 		envID = environmentID;
@@ -111,9 +115,9 @@ public class PythonOperationInfo {
 		sb.append("OtherID: ").append(otherID).append("\n");
 		sb.append("Name: ").append(name).append("\n");
 		sb.append("Types: ").append(types).append("\n");
-		sb.append("Keys1: ").append(Arrays.toString(keys1)).append("\n");
-		sb.append("Keys2: ").append(Arrays.toString(keys2)).append("\n");
-		sb.append("Keys: ").append(Arrays.toString(keys)).append("\n");
+		sb.append("Keys1: ").append(keys1).append("\n");
+		sb.append("Keys2: ").append(keys2).append("\n");
+		sb.append("Keys: ").append(keys).append("\n");
 		sb.append("Count: ").append(count).append("\n");
 		sb.append("Field: ").append(field).append("\n");
 		sb.append("Order: ").append(order.toString()).append("\n");

http://git-wip-us.apache.org/repos/asf/flink/blob/2fc64994/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index a4f38fe..1df15a5 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -55,6 +55,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.UUID;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
@@ -422,7 +423,7 @@ public class PythonPlanBinder {
 	}
 
 	private void createValueSource(ExecutionEnvironment env, PythonOperationInfo info) {
-		sets.add(info.setID, env.fromElements(info.values).setParallelism(info.parallelism).name("ValueSource")
+		sets.add(info.setID, env.fromCollection(info.values).setParallelism(info.parallelism).name("ValueSource")
 			.map(new SerializerMap<>()).setParallelism(info.parallelism).name("ValueSourcePostStep"));
 	}
 
@@ -470,7 +471,7 @@ public class PythonPlanBinder {
 	private <K extends Tuple> void createDistinctOperation(PythonOperationInfo info) {
 		DataSet<Tuple2<K, byte[]>> op = sets.getDataSet(info.parentID);
 		DataSet<byte[]> result = op
-			.distinct(info.keys).setParallelism(info.parallelism).name("Distinct")
+			.distinct(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism).name("Distinct")
 			.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("DistinctPostStep");
 		sets.add(info.setID, result);
 	}
@@ -495,13 +496,13 @@ public class PythonPlanBinder {
 
 	private void createGroupOperation(PythonOperationInfo info) {
 		DataSet<?> op1 = sets.getDataSet(info.parentID);
-		sets.add(info.setID, op1.groupBy(info.keys));
+		sets.add(info.setID, op1.groupBy(info.keys.toArray(new String[info.keys.size()])));
 	}
 
 	private <K extends Tuple> void createHashPartitionOperation(PythonOperationInfo info) {
 		DataSet<Tuple2<K, byte[]>> op1 = sets.getDataSet(info.parentID);
 		DataSet<byte[]> result = op1
-			.partitionByHash(info.keys).setParallelism(info.parallelism)
+			.partitionByHash(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism)
 			.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("HashPartitionPostStep");
 		sets.add(info.setID, result);
 	}
@@ -530,8 +531,8 @@ public class PythonPlanBinder {
 	private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo info, TypeInformation<OUT> type) {
 		DataSet<IN1> op1 = sets.getDataSet(info.parentID);
 		DataSet<IN2> op2 = sets.getDataSet(info.otherID);
-		Keys.ExpressionKeys<IN1> key1 = new Keys.ExpressionKeys<>(info.keys1, op1.getType());
-		Keys.ExpressionKeys<IN2> key2 = new Keys.ExpressionKeys<>(info.keys2, op2.getType());
+		Keys.ExpressionKeys<IN1> key1 = new Keys.ExpressionKeys<>(info.keys1.toArray(new String[info.keys1.size()]), op1.getType());
+		Keys.ExpressionKeys<IN2> key2 = new Keys.ExpressionKeys<>(info.keys2.toArray(new String[info.keys2.size()]), op2.getType());
 		PythonCoGroup<IN1, IN2, OUT> pcg = new PythonCoGroup<>(operatorConfig, info.envID, info.setID, type);
 		sets.add(info.setID, new CoGroupRawOperator<>(op1, op2, key1, key2, pcg, type, info.name).setParallelism(info.parallelism));
 	}
@@ -623,19 +624,21 @@ public class PythonPlanBinder {
 		}
 	}
 
-	private <IN1, IN2> DataSet<Tuple2<byte[], byte[]>> createDefaultJoin(DataSet<IN1> op1, DataSet<IN2> op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode, int parallelism) {
+	private <IN1, IN2> DataSet<Tuple2<byte[], byte[]>> createDefaultJoin(DataSet<IN1> op1, DataSet<IN2> op2, List<String> firstKeys, List<String> secondKeys, DatasizeHint mode, int parallelism) {
+		String[] firstKeysArray = firstKeys.toArray(new String[firstKeys.size()]);
+		String[] secondKeysArray = secondKeys.toArray(new String[secondKeys.size()]);
 		switch (mode) {
 			case NONE:
 				return op1
-					.join(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+					.join(op2).where(firstKeysArray).equalTo(secondKeysArray).setParallelism(parallelism)
 					.map(new NestedKeyDiscarder<Tuple2<IN1, IN2>>()).setParallelism(parallelism).name("DefaultJoinPostStep");
 			case HUGE:
 				return op1
-					.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+					.joinWithHuge(op2).where(firstKeysArray).equalTo(secondKeysArray).setParallelism(parallelism)
 					.map(new NestedKeyDiscarder<Tuple2<IN1, IN2>>()).setParallelism(parallelism).name("DefaultJoinPostStep");
 			case TINY:
 				return op1
-					.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+					.joinWithTiny(op2).where(firstKeysArray).equalTo(secondKeysArray).setParallelism(parallelism)
 					.map(new NestedKeyDiscarder<Tuple2<IN1, IN2>>()).setParallelism(parallelism).name("DefaultJoinPostStep");
 			default:
 				throw new IllegalArgumentException("Invalid join mode specified.");


[2/7] flink git commit: [FLINK-9257][tests] Fix wrong "All tests pass" message

Posted by ch...@apache.org.
[FLINK-9257][tests] Fix wrong "All tests pass" message

This closes #6053.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45ac85e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45ac85e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45ac85e2

Branch: refs/heads/master
Commit: 45ac85e2c4ec66ff02e4277c1781247710252a26
Parents: 03d77b8
Author: Florian Schmidt <fl...@icloud.com>
Authored: Tue May 22 11:36:55 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 14 19:34:00 2018 +0200

----------------------------------------------------------------------
 flink-end-to-end-tests/run-nightly-tests.sh     | 211 +++++--------------
 flink-end-to-end-tests/run-pre-commit-tests.sh  |  74 ++-----
 flink-end-to-end-tests/test-scripts/common.sh   |  79 ++-----
 .../test-scripts/elasticsearch-common.sh        |   1 -
 .../test-scripts/test-runner-common.sh          |  76 +++++++
 .../test-scripts/test_batch_allround.sh         |   3 -
 flink-end-to-end-tests/test-scripts/test_ha.sh  |  12 +-
 .../test_high_parallelism_iterations.sh         |   3 -
 .../test_local_recovery_and_scheduling.sh       |  22 +-
 .../test-scripts/test_quickstarts.sh            |   2 -
 .../test_resume_externalized_checkpoints.sh     |   5 -
 .../test-scripts/test_resume_savepoint.sh       |   3 -
 .../test-scripts/test_shaded_hadoop_s3a.sh      |   3 -
 .../test-scripts/test_shaded_presto_s3.sh       |   3 -
 .../test_stateful_stream_job_upgrade.sh         |   3 -
 .../test-scripts/test_streaming_bucketing.sh    |   3 -
 .../test-scripts/test_streaming_classloader.sh  |  11 +-
 ...test_streaming_distributed_cache_via_blob.sh |   2 +-
 .../test-scripts/test_streaming_kafka010.sh     |   6 -
 .../test-scripts/test_streaming_sql.sh          |   3 -
 20 files changed, 187 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 46b2609..cf70558 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -25,180 +25,65 @@ if [ -z "$END_TO_END_DIR" ] ; then
     exit 1  # fail
 fi
 
-export END_TO_END_DIR="${END_TO_END_DIR}"
+export END_TO_END_DIR
 
 if [ -z "$FLINK_DIR" ] ; then
     echo "You have to export the Flink distribution directory as FLINK_DIR"
     exit 1
 fi
 
-source "$(dirname "$0")"/test-scripts/common.sh
+source "${END_TO_END_DIR}/test-scripts/test-runner-common.sh"
 
 FLINK_DIR="`( cd \"$FLINK_DIR\" && pwd )`" # absolutized and normalized
 
 echo "flink-end-to-end-test directory: $END_TO_END_DIR"
 echo "Flink distribution directory: $FLINK_DIR"
 
-EXIT_CODE=0
-
 # Template for adding a test:
 
-# if [ $EXIT_CODE == 0 ]; then
-#    run_test "<description>" "$END_TO_END_DIR/test-scripts/<script_name>"
-#    EXIT_CODE=$?
-# fi
-
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file true false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file false false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (rocks, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (rocks, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint after terminal failure (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Streaming bucketing end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test \
-    "Elasticsearch (v1.7.1) sink end-to-end test" \
-    "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test \
-    "Elasticsearch (v2.3.5) sink end-to-end test" \
-    "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test \
-    "Elasticsearch (v5.1.2) sink end-to-end test" \
-    "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Local recovery and sticky scheduling nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
-  EXIT_CODE=$?
-fi
-
-# Exit code for Travis build success/failure
-exit $EXIT_CODE
+# run_test "<description>" "$END_TO_END_DIR/test-scripts/<script_name>"
+
+run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file true false"
+run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file false false"
+run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false"
+run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true true"
+
+run_test "Resuming Savepoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file true"
+run_test "Resuming Savepoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file false"
+run_test "Resuming Savepoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file true"
+run_test "Resuming Savepoint (file, sync, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file false"
+run_test "Resuming Savepoint (file, async, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file true"
+run_test "Resuming Savepoint (file, sync, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file false"
+run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks"
+run_test "Resuming Savepoint (rocks, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks"
+run_test "Resuming Savepoint (rocks, scale down) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks"
+
+run_test "Resuming Externalized Checkpoint (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true"
+run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false"
+run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks"
+
+run_test "Resuming Externalized Checkpoint after terminal failure (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true true"
+run_test "Resuming Externalized Checkpoint after terminal failure (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file false true"
+run_test "Resuming Externalized Checkpoint after terminal failure (rocks) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks true"
+
+run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
+run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh"
+run_test "Streaming bucketing end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh"
+run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
+
+run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false false"
+run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false true"
+run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks false false"
+run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks true false"
+run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks false true"
+run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks true true"
+
+run_test "Elasticsearch (v1.7.1) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz"
+run_test "Elasticsearch (v2.3.5) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"
+run_test "Elasticsearch (v5.1.2) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz"
+
+run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
+run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
+
+printf "\n[PASS] All tests passed\n"
+exit 0

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/run-pre-commit-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-pre-commit-tests.sh b/flink-end-to-end-tests/run-pre-commit-tests.sh
index 4d63397..6355fd0 100755
--- a/flink-end-to-end-tests/run-pre-commit-tests.sh
+++ b/flink-end-to-end-tests/run-pre-commit-tests.sh
@@ -25,74 +25,32 @@ if [ -z "$END_TO_END_DIR" ] ; then
     exit 1  # fail
 fi
 
-export END_TO_END_DIR="${END_TO_END_DIR}"
+export END_TO_END_DIR
 
 if [ -z "$FLINK_DIR" ] ; then
     echo "You have to export the Flink distribution directory as FLINK_DIR"
     exit 1
 fi
 
-source "$(dirname "$0")"/test-scripts/common.sh
+source ${END_TO_END_DIR}/test-scripts/test-runner-common.sh
 
 FLINK_DIR="`( cd \"$FLINK_DIR\" && pwd )`" # absolutized and normalized
 
 echo "flink-end-to-end-test directory: $END_TO_END_DIR"
 echo "Flink distribution directory: $FLINK_DIR"
 
-EXIT_CODE=0
-
 # Template for adding a test:
-
-# if [ $EXIT_CODE == 0 ]; then
-#    run_test "<description>" "$END_TO_END_DIR/test-scripts/<script_name>"
-#    EXIT_CODE=$?
-# fi
-
-if [ $EXIT_CODE == 0 ]; then
-    run_test "Batch Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_python_wordcount.sh"
-    EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-    run_test "Streaming Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_python_wordcount.sh"
-    EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-    run_test "Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh"
-    EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-    run_test "Kafka end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_kafka010.sh"
-    EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-    run_test "class loading end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_classloader.sh"
-    EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-    run_test "Shaded Hadoop S3A end-to-end test" "$END_TO_END_DIR/test-scripts/test_shaded_hadoop_s3a.sh"
-    EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-    run_test "Shaded Presto S3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_shaded_presto_s3.sh"
-    EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-    run_test "Hadoop-free Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_hadoop_free.sh"
-    EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-    run_test "Distributed cache end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_distributed_cache_via_blob.sh"
-    EXIT_CODE=$?
-fi
-
-
-# Exit code for Travis build success/failure
-exit $EXIT_CODE
+# run_test "<description>" "$END_TO_END_DIR/test-scripts/<script_name>"
+
+run_test "Batch Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_python_wordcount.sh"
+run_test "Streaming Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_python_wordcount.sh"
+run_test "Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh"
+run_test "Kafka end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_kafka010.sh"
+run_test "class loading end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_classloader.sh"
+run_test "Shaded Hadoop S3A end-to-end test" "$END_TO_END_DIR/test-scripts/test_shaded_hadoop_s3a.sh"
+run_test "Shaded Presto S3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_shaded_presto_s3.sh"
+run_test "Hadoop-free Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_hadoop_free.sh"
+run_test "Distributed cache end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_distributed_cache_via_blob.sh"
+
+printf "\n[PASS] All tests passed\n"
+exit 0

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 18833cc..3498b56 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -24,7 +24,7 @@ if [[ -z $FLINK_DIR ]]; then
     exit 1
 fi
 
-export PASS=1
+export EXIT_CODE=0
 
 echo "Flink dist directory: $FLINK_DIR"
 
@@ -138,7 +138,6 @@ function start_local_zk {
 
             if [ "${address}" != "localhost" ]; then
                 echo "[ERROR] Parse error. Only available for localhost."
-                PASS=""
                 exit 1
             fi
             ${FLINK_DIR}/bin/zookeeper.sh start $id
@@ -168,14 +167,7 @@ function start_cluster {
   done
 }
 
-function stop_cluster {
-  "$FLINK_DIR"/bin/stop-cluster.sh
-
-  # stop zookeeper only if there are processes running
-  if ! [ "`jps | grep 'FlinkZooKeeperQuorumPeer' | wc -l`" = "0" ]; then
-    "$FLINK_DIR"/bin/zookeeper.sh stop
-  fi
-
+function check_logs_for_errors {
   if grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \
       | grep -v "RetriableCommitFailedException" \
       | grep -v "NoAvailableBrokersException" \
@@ -193,8 +185,11 @@ function stop_cluster {
       | grep -iq "error"; then
     echo "Found error in log files:"
     cat $FLINK_DIR/log/*
-    PASS=""
+    EXIT_CODE=1
   fi
+}
+
+function check_logs_for_exceptions {
   if grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \
       | grep -v "RetriableCommitFailedException" \
       | grep -v "NoAvailableBrokersException" \
@@ -215,13 +210,24 @@ function stop_cluster {
       | grep -iq "exception"; then
     echo "Found exception in log files:"
     cat $FLINK_DIR/log/*
-    PASS=""
+    EXIT_CODE=1
   fi
+}
 
+function check_logs_for_non_empty_out_files {
   if grep -ri "." $FLINK_DIR/log/*.out > /dev/null; then
     echo "Found non-empty .out files:"
     cat $FLINK_DIR/log/*.out
-    PASS=""
+    EXIT_CODE=1
+  fi
+}
+
+function stop_cluster {
+  "$FLINK_DIR"/bin/stop-cluster.sh
+
+  # stop zookeeper only if there are processes running
+  if ! [ "`jps | grep 'FlinkZooKeeperQuorumPeer' | wc -l`" = "0" ]; then
+    "$FLINK_DIR"/bin/zookeeper.sh stop
   fi
 }
 
@@ -281,24 +287,15 @@ function check_result_hash {
   if [[ "$actual" != "$expected" ]]
   then
     echo "FAIL $name: Output hash mismatch.  Got $actual, expected $expected."
-    PASS=""
     echo "head hexdump of actual:"
     head $outfile_prefix* | hexdump -c
+    exit 1
   else
     echo "pass $name"
     # Output files are left behind in /tmp
   fi
 }
 
-function check_all_pass {
-  if [[ ! "$PASS" ]]
-  then
-    echo "One or more tests FAILED."
-    exit 1
-  fi
-  echo "All tests PASS"
-}
-
 function s3_put {
   local_file=$1
   bucket=$2
@@ -458,37 +455,5 @@ function start_timer {
 # prints the number of minutes and seconds that have elapsed since the last call to start_timer
 function end_timer {
     duration=$SECONDS
-    echo "$(($duration / 60)) minutes and $(($duration % 60)) seconds elapsed."
-}
-
-#######################################
-# Prints the given description, runs the given test and prints how long the execution took.
-# Arguments:
-#   $1: description of the test
-#   $2: command to execute
-#######################################
-function run_test {
-    description="$1"
-    command="$2"
-
-    printf "\n==============================================================================\n"
-    printf "Running ${description}\n"
-    printf "==============================================================================\n"
-    start_timer
-    ${command}
-    exit_code="$?"
-    end_timer
-    return "${exit_code}"
-}
-
-# Shuts down the cluster and cleans up all temporary folders and files. Make sure to clean up even in case of failures.
-function cleanup {
-  stop_cluster
-  tm_kill_all
-  jm_kill_all
-  rm -rf $TEST_DATA_DIR 2> /dev/null
-  revert_default_config
-  check_all_pass
-  rm -rf $FLINK_DIR/log/* 2> /dev/null
-}
-trap cleanup EXIT
+    echo "$(($duration / 60)) minutes and $(($duration % 60)) seconds"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 87ffa82..7b627fe 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -47,7 +47,6 @@ function verify_elasticsearch_process_exist {
     # make sure the elasticsearch node is actually running
     if [ "$elasticsearchProcess" != "Elasticsearch" ]; then
       echo "Elasticsearch node is not running."
-      PASS=""
       exit 1
     else
       echo "Elasticsearch node is running."

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test-runner-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test-runner-common.sh b/flink-end-to-end-tests/test-scripts/test-runner-common.sh
new file mode 100644
index 0000000..eeae0f7
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test-runner-common.sh
@@ -0,0 +1,76 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "${END_TO_END_DIR}"/test-scripts/common.sh
+
+#######################################
+# Prints the given description, runs the given test and prints how long the execution took.
+# Arguments:
+#   $1: description of the test
+#   $2: command to execute
+#######################################
+function run_test {
+    description="$1"
+    command="$2"
+
+    printf "\n==============================================================================\n"
+    printf "Running '${description}'\n"
+    printf "==============================================================================\n"
+    start_timer
+    ${command}
+    exit_code="$?"
+    time_elapsed=$(end_timer)
+
+    check_logs_for_errors
+    check_logs_for_exceptions
+    check_logs_for_non_empty_out_files
+
+    cleanup
+
+    if [[ ${exit_code} == 0 ]]; then
+        if [[ ${EXIT_CODE} != 0 ]]; then
+            printf "\n[FAIL] '${description}' failed after ${time_elapsed}! Test exited with exit code 0 but the logs contained errors, exceptions or non-empty .out files\n\n"
+            exit_code=1
+        else
+            printf "\n[PASS] '${description}' passed after ${time_elapsed}! Test exited with exit code 0.\n\n"
+        fi
+    else
+        if [[ ${EXIT_CODE} != 0 ]]; then
+            printf "\n[FAIL] '${description}' failed after ${time_elapsed}! Test exited with exit code ${exit_code} and the logs contained errors, exceptions or non-empty .out files\n\n"
+        else
+            printf "\n[FAIL] '${description}' failed after ${time_elapsed}! Test exited with exit code ${exit_code}\n\n"
+        fi
+    fi
+
+    if [[ ${exit_code} != 0 ]]; then
+        exit "${exit_code}"
+    fi
+}
+
+# Shuts down the cluster and cleans up all temporary folders and files. Make sure to clean up even in case of failures.
+function cleanup {
+  stop_cluster
+  tm_kill_all
+  jm_kill_all
+  rm -rf $TEST_DATA_DIR 2> /dev/null
+  revert_default_config
+  rm -rf $FLINK_DIR/log/* 2> /dev/null
+}
+
+trap cleanup EXIT

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
index 0cbaa21..1cb4484 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
@@ -44,9 +44,6 @@ function test_cleanup {
 
   # revert our modifications to the Flink distribution
   mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
-
-  # make sure to run regular cleanup as well
-  cleanup
 }
 trap test_cleanup INT
 trap test_cleanup EXIT

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_ha.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_ha.sh b/flink-end-to-end-tests/test-scripts/test_ha.sh
index 5714161..a44a139 100755
--- a/flink-end-to-end-tests/test-scripts/test_ha.sh
+++ b/flink-end-to-end-tests/test-scripts/test_ha.sh
@@ -42,7 +42,6 @@ function stop_cluster_and_watchdog() {
             wait ${TM_WATCHDOG_PID} 2> /dev/null
         fi
 
-        cleanup
         CLEARED=1
     fi
 }
@@ -50,28 +49,29 @@ function stop_cluster_and_watchdog() {
 function verify_logs() {
     local OUTPUT=$FLINK_DIR/log/*.out
     local JM_FAILURES=$1
+    local EXIT_CODE=0
 
     # verify that we have no alerts
     if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
         echo "FAILURE: Alerts found at the general purpose DataStream job."
-        PASS=""
+        EXIT_CODE=1
     fi
 
     # checks that all apart from the first JM recover the failed jobgraph.
     if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then
         echo "FAILURE: A JM did not take over."
-        PASS=""
+        EXIT_CODE=1
     fi
 
     # search the logs for JMs that log completed checkpoints
     if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((JM_FAILURES + 1)) ]; then
         echo "FAILURE: A JM did not execute the job."
-        PASS=""
+        EXIT_CODE=1
     fi
 
-    if [[ ! "$PASS" ]]; then
+    if [[ $EXIT_CODE != 0 ]]; then
         echo "One or more tests FAILED."
-        exit 1
+        exit $EXIT_CODE
     fi
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh b/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
index 99f8017..dbdacef 100755
--- a/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
+++ b/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
@@ -60,9 +60,6 @@ function test_cleanup {
 
   # revert our modifications to the Flink distribution
   mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
-
-  # make sure to run regular cleanup as well
-  cleanup
 }
 trap test_cleanup INT
 trap test_cleanup EXIT

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
index 13e7616..5e73591 100755
--- a/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
+++ b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
@@ -34,14 +34,14 @@ function check_logs {
 
     if [ ${failed_local_recovery} -ne 0 ]
     then
-        PASS=""
         echo "FAILURE: Found ${failed_local_recovery} failed attempt(s) for local recovery of correctly scheduled task(s)."
+        exit 1
     fi
 
     if [ ${attempt_local_recovery} -eq 0 ]
     then
-        PASS=""
         echo "FAILURE: Found no attempt for local recovery. Configuration problem?"
+        exit 1
     fi
 }
 
@@ -53,8 +53,6 @@ function cleanup_after_test {
     #
     kill ${watchdog_pid} 2> /dev/null
     wait ${watchdog_pid} 2> /dev/null
-    #
-    cleanup
 }
 
 # Calls the cleanup step for this tests and exits with an error.
@@ -71,9 +69,14 @@ function run_local_recovery_test {
     local incremental=$4
     local kill_jvm=$5
 
-    echo "Running local recovery test on ${backend} backend: incremental checkpoints = ${incremental}, kill JVM = ${kill_jvm}."
-    TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-local-recovery-and-allocation-test/target/StickyAllocationAndLocalRecoveryTestJob.jar
+    echo "Running local recovery test with configuration:
+        parallelism: ${parallelism}
+        max attempts: ${max_attempts}
+        backend: ${backend}
+        incremental checkpoints: ${incremental}
+        kill JVM: ${kill_jvm}"
 
+    TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-local-recovery-and-allocation-test/target/StickyAllocationAndLocalRecoveryTestJob.jar
     # Backup conf and configure for HA
     backup_config
     create_ha_config
@@ -111,11 +114,6 @@ function run_local_recovery_test {
 
 ## MAIN
 trap cleanup_after_test_and_exit_fail EXIT
-run_local_recovery_test 4 3 "file" "false" "false"
-run_local_recovery_test 4 3 "file" "false" "true"
-run_local_recovery_test 4 10 "rocks" "false" "false"
-run_local_recovery_test 4 10 "rocks" "true" "false"
-run_local_recovery_test 4 10 "rocks" "false" "true"
-run_local_recovery_test 4 10 "rocks" "true" "true"
+run_local_recovery_test "$@"
 trap - EXIT
 exit 0

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
index 9657938..43d2d21 100755
--- a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
+++ b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
@@ -81,7 +81,6 @@ if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
     echo "Success: There are no flink core classes are contained in the jar."
 else
     echo "Failure: There are flink core classes are contained in the jar."
-    PASS=""
     exit 1
 fi
 
@@ -90,7 +89,6 @@ if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" contentsInJar.tx
       `grep -c "org/apache/flink/streaming/connectors/elasticsearch5" contentsInJar.txt` -eq '0' ]]; then
 
     echo "Failure: Since Elasticsearch5SinkExample.class and other user classes are not included in the jar. "
-    PASS=""
     exit 1
 else
     echo "Success: Elasticsearch5SinkExample.class and other user classes are included in the jar."

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
index 5cb2d8e..6472b23 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh
@@ -34,9 +34,6 @@ function test_cleanup {
   trap "" EXIT
 
   rollback_flink_slf4j_metric_reporter
-
-  # make sure to run regular cleanup as well
-  cleanup
 }
 trap test_cleanup INT
 trap test_cleanup EXIT
@@ -91,14 +88,12 @@ CHECKPOINT_PATH=$(ls -d $CHECKPOINT_DIR/$DATASTREAM_JOB/chk-[1-9]*)
 
 if [ -z $CHECKPOINT_PATH ]; then
   echo "Expected an externalized checkpoint to be present, but none exists."
-  PASS=""
   exit 1
 fi
 
 NUM_CHECKPOINTS=$(echo $CHECKPOINT_PATH | wc -l | tr -d ' ')
 if (( $NUM_CHECKPOINTS > 1 )); then
   echo "Expected only exactly 1 externalized checkpoint to be present, but $NUM_CHECKPOINTS exists."
-  PASS=""
   exit 1
 fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
index 084cf51..1651ab0 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
@@ -69,9 +69,6 @@ function test_cleanup {
 
   # revert our modifications to the Flink distribution
   rm ${FLINK_DIR}/lib/flink-metrics-slf4j-*.jar
-
-  # make sure to run regular cleanup as well
-  cleanup
 }
 trap test_cleanup INT
 trap test_cleanup EXIT

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh b/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh
index a989488..e5ac5bc 100755
--- a/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh
+++ b/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh
@@ -37,9 +37,6 @@ function s3_cleanup {
   # remove any leftover settings
   sed -i -e 's/s3.access-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
   sed -i -e 's/s3.secret-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
-
-  # make sure to run regular cleanup as well
-  cleanup
 }
 trap s3_cleanup EXIT
 

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh b/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh
index 559f1fe..4092805 100755
--- a/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh
+++ b/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh
@@ -33,9 +33,6 @@ s3_put $TEST_INFRA_DIR/test-data/words $ARTIFACTS_AWS_BUCKET flink-end-to-end-te
 function s3_cleanup {
   s3_delete $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-presto-s3
   rm $FLINK_DIR/lib/flink-s3-fs*.jar
-
-  # make sure to run regular cleanup as well
-  cleanup
 }
 trap s3_cleanup EXIT
 

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_stateful_stream_job_upgrade.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_stateful_stream_job_upgrade.sh b/flink-end-to-end-tests/test-scripts/test_stateful_stream_job_upgrade.sh
index 3ecda5a..f7c987e 100755
--- a/flink-end-to-end-tests/test-scripts/test_stateful_stream_job_upgrade.sh
+++ b/flink-end-to-end-tests/test-scripts/test_stateful_stream_job_upgrade.sh
@@ -43,9 +43,6 @@ function test_cleanup {
 
   # revert our modifications to the Flink distribution
   rm ${FLINK_DIR}/lib/flink-metrics-slf4j-*.jar
-
-  # make sure to run regular cleanup as well
-  cleanup
 }
 trap test_cleanup INT
 trap test_cleanup EXIT

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
index 10548d8..e871916 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
@@ -36,9 +36,6 @@ function bucketing_cleanup() {
 
   # restore default logging level
   sed -i -e 's/log4j.logger.org.apache.flink=DEBUG/#log4j.logger.org.apache.flink=INFO/g' $FLINK_DIR/conf/log4j.properties
-
-  # make sure to run regular cleanup as well
-  cleanup
 }
 trap bucketing_cleanup INT
 trap bucketing_cleanup EXIT

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_streaming_classloader.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_classloader.sh b/flink-end-to-end-tests/test-scripts/test_streaming_classloader.sh
index ce259a0..f71557f 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_classloader.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_classloader.sh
@@ -18,6 +18,7 @@
 ################################################################################
 
 source "$(dirname "$0")"/common.sh
+EXIT_CODE=0
 
 TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-parent-child-classloading-test/target/ClassLoaderTestProgram.jar
 
@@ -51,7 +52,7 @@ if [[ "$OUTPUT" != "$EXPECTED" ]]; then
   echo "Output from Flink program does not match expected output."
   echo -e "EXPECTED: $EXPECTED"
   echo -e "ACTUAL: $OUTPUT"
-  PASS=""
+  EXIT_CODE=1
 fi
 
 # This verifies that Flink classes are still resolved from the parent because the default
@@ -81,7 +82,7 @@ if [[ "$OUTPUT" != "$EXPECTED" ]]; then
   echo "Output from Flink program does not match expected output."
   echo -e "EXPECTED: $EXPECTED"
   echo -e "ACTUAL: $OUTPUT"
-  PASS=""
+  EXIT_CODE=1
 fi
 
 echo "Testing child-first class loading"
@@ -110,5 +111,9 @@ if [[ "$OUTPUT" != "$EXPECTED" ]]; then
   echo "Output from Flink program does not match expected output."
   echo -e "EXPECTED: $EXPECTED"
   echo -e "ACTUAL: $OUTPUT"
-  PASS=""
+  EXIT_CODE=1
+fi
+
+if [[ ${EXIT_CODE} != 0 ]]; then
+    exit ${EXIT_CODE}
 fi

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_streaming_distributed_cache_via_blob.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_distributed_cache_via_blob.sh b/flink-end-to-end-tests/test-scripts/test_streaming_distributed_cache_via_blob.sh
index 1d814ed..4c1229d 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_distributed_cache_via_blob.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_distributed_cache_via_blob.sh
@@ -36,5 +36,5 @@ if [[ "$OUTPUT" != "$EXPECTED" ]]; then
   echo "Output from Flink program does not match expected output."
   echo -e "EXPECTED: $EXPECTED"
   echo -e "ACTUAL: $OUTPUT"
-  PASS=""
+  exit 1
 fi

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
index abc6186..c9cc19d 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
@@ -39,9 +39,6 @@ function test_cleanup {
 
   # revert our modifications to the Flink distribution
   mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
-
-  # make sure to run regular cleanup as well
-  cleanup
 }
 trap test_cleanup INT
 trap test_cleanup EXIT
@@ -64,7 +61,6 @@ function verify_output {
     echo "Output from Flink program does not match expected output."
     echo -e "EXPECTED FOR KEY: --$expected--"
     echo -e "ACTUAL: --$2--"
-    PASS=""
     exit 1
   fi
 }
@@ -90,7 +86,6 @@ modify_num_partitions test-input 2
 
 if (( $(get_num_partitions test-input) != 2 )); then
   echo "Failed adding a partition to test-input topic."
-  PASS=""
   exit 1
 fi
 
@@ -101,7 +96,6 @@ send_messages_to_kafka "elephant,13,64213\ngiraffe,9,65555\nbee,5,65647\nsquirre
 # verify that our assumption that the new partition actually has written messages is correct
 if (( $(get_partition_end_offset test-input 1) == 0 )); then
   echo "The newly created partition does not have any new messages, and therefore partition discovery cannot be verified."
-  PASS=""
   exit 1
 fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/45ac85e2/flink-end-to-end-tests/test-scripts/test_streaming_sql.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_sql.sh b/flink-end-to-end-tests/test-scripts/test_streaming_sql.sh
index 391b623..5aca0e5 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_sql.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_sql.sh
@@ -42,9 +42,6 @@ function sql_cleanup() {
 
   # remove flink-table from lib folder
   rm $FLINK_DIR/lib/flink-table*jar
-
-  # make sure to run regular cleanup as well
-  cleanup
 }
 trap sql_cleanup INT
 trap sql_cleanup EXIT


[3/7] flink git commit: [FLINK-8744][docs] Add CommonOption annotation

Posted by ch...@apache.org.
[FLINK-8744][docs] Add CommonOption annotation

This closes #5843.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eaff4da1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eaff4da1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eaff4da1

Branch: refs/heads/master
Commit: eaff4da15bdd7528dcc0d8a37fd59642cee53850
Parents: 45ac85e
Author: zentol <ch...@apache.org>
Authored: Thu Apr 12 13:27:09 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 14 19:45:16 2018 +0200

----------------------------------------------------------------------
 docs/_includes/generated/common_section.html    |  61 ++++++
 docs/ops/config.md                              | 210 +------------------
 .../flink/annotation/docs/Documentation.java    |  19 ++
 .../configuration/CheckpointingOptions.java     |   5 +
 .../apache/flink/configuration/CoreOptions.java |   1 +
 .../configuration/HighAvailabilityOptions.java  |   3 +
 .../flink/configuration/JobManagerOptions.java  |   2 +
 .../flink/configuration/SecurityOptions.java    |   2 +
 .../flink/configuration/TaskManagerOptions.java |   3 +
 .../ConfigOptionsDocGenerator.java              |  52 ++++-
 .../ConfigOptionsDocGeneratorTest.java          |  48 +++++
 .../ConfigOptionsDocsCompletenessITCase.java    |  70 +++++--
 .../configuration/data/TestCommonOptions.java   |  46 ++++
 13 files changed, 291 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eaff4da1/docs/_includes/generated/common_section.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/common_section.html b/docs/_includes/generated/common_section.html
new file mode 100644
index 0000000..6ad3bef
--- /dev/null
+++ b/docs/_includes/generated/common_section.html
@@ -0,0 +1,61 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>jobmanager.heap.mb</h5></td>
+            <td style="word-wrap: break-word;">1024</td>
+            <td>JVM heap size (in megabytes) for the JobManager.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.heap.mb</h5></td>
+            <td style="word-wrap: break-word;">1024</td>
+            <td>JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.</td>
+        </tr>
+        <tr>
+            <td><h5>parallelism.default</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.numberOfTaskSlots</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The state backend to be used to store and checkpoint state.</td>
+        </tr>
+        <tr>
+            <td><h5>state.checkpoints.dir</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).</td>
+        </tr>
+        <tr>
+            <td><h5>state.savepoints.dir</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The default directory for savepoints. Used by the state backends that write savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend).</td>
+        </tr>
+        <tr>
+            <td><h5>high-availability</h5></td>
+            <td style="word-wrap: break-word;">"NONE"</td>
+            <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER".</td>
+        </tr>
+        <tr>
+            <td><h5>high-availability.storageDir</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>File system path (URI) where Flink persists metadata in high-availability setups.</td>
+        </tr>
+        <tr>
+            <td><h5>security.ssl.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Turns on SSL for internal network communication. This can be optionally overridden by flags defined in different transport modules.</td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/eaff4da1/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 9030bf7..1e6be19 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -40,215 +40,7 @@ The configuration files for the TaskManagers can be different, Flink does not as
 
 ## Common Options
 
-- `env.java.home`: The path to the Java installation to use (DEFAULT: system's default Java installation, if found). Needs to be specified if the startup scripts fail to automatically resolve the java home directory. Can be specified to point to a specific java installation or version. If this option is not specified, the startup scripts also evaluate the `$JAVA_HOME` environment variable.
-
-- `env.java.opts`: Set custom JVM options. This value is respected by Flink's start scripts, both JobManager and
-TaskManager, and Flink's YARN client. This can be used to set different garbage collectors or to include remote
-debuggers into the JVMs running Flink's services. Enclosing options in double quotes delays parameter substitution
-allowing access to variables from Flink's startup scripts. Use `env.java.opts.jobmanager` and `env.java.opts.taskmanager`
-for JobManager or TaskManager-specific options, respectively.
-
-- `env.java.opts.jobmanager`: JobManager-specific JVM options. These are used in addition to the regular `env.java.opts`.
-
-- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`.
-
-- `jobmanager.rpc.address`: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). **Note:** The address (host name or IP) should be accessible by all nodes including the client.
-
-- `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: 6123).
-
-- `jobmanager.heap.mb`: JVM heap size (in megabytes) for the JobManager. You may have to increase the heap size for the JobManager if you are running very large applications (with many operators), or if you are keeping a long history of them.
-
-- `taskmanager.heap.mb`: JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. In contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so this value should be as large as possible. If the cluster is exclusively running Flink, the total amount of available memory per machine minus some memory for the operating system (maybe 1-2 GB) is a good value. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.
-
-- `taskmanager.numberOfTaskSlots`: The number of parallel operator or user function instances that a single TaskManager can run (DEFAULT: 1). If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores). [More about task slots](config.html#configuring-taskmanager-processing-slots).
-
-- `parallelism.default`: The default parallelism to use for programs that have no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution resources for the program's execution. **Note**: The default parallelism can be overwritten for an entire job by calling `setParallelism(int parallelism)` on the `ExecutionEnvironment` or by passing `-p <parallelism>` to the Flink Command-line frontend. It can be overwritten for single transformations by calling `setParallelism(int
-parallelism)` on an operator. See [Parallel Execution]({{site.baseurl}}/dev/parallel.html) for more information about parallelism.
-
-- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed).
-By default, this is set to `file:///` which points to the local filesystem. This means that the local
-filesystem is going to be used to search for user-specified files **without** an explicit scheme
-definition. As another example, if this is set to `hdfs://localhost:9000/`, then a user-specified file path
-without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to be transformed into
-`hdfs://localhost:9000/user/USERNAME/in.txt`. This scheme is used **ONLY** if no other scheme is specified (explicitly) in the user-provided `URI`.
-
-- `classloader.resolve-order`: Whether Flink should use a child-first `ClassLoader` when loading
-user-code classes or a parent-first `ClassLoader`. Can be one of `parent-first` or `child-first`. (default: `child-first`)
-
-- `classloader.parent-first-patterns.default`: A (semicolon-separated) list of patterns that specifies which
-classes should always be resolved through the parent `ClassLoader` first. A pattern is a simple
-prefix that is checked against the fully qualified class name. By default, this is set to
-`"java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback"`.
-To extend this list beyond the default it is recommended to configure `classloader.parent-first-patterns.additional` instead of modifying this setting directly.
-
-- `classloader.parent-first-patterns.additional`: A (semicolon-separated) list of patterns that specifies which
-classes should always be resolved through the parent `ClassLoader` first. A pattern is a simple
-prefix that is checked against the fully qualified class name.
-This list is appended to `classloader.parent-first-patterns.default`.
-
-## Advanced Options
-
-### Compute
-
-- `taskmanager.compute.numa`: When enabled a TaskManager is started on each NUMA node for each worker listed in *conf/slaves* (DEFAULT: false). Note: only supported when deploying Flink as a standalone cluster.
-
-### Managed Memory
-
-By default, Flink allocates a fraction of `0.7` of the free memory (total memory configured via `taskmanager.heap.mb` minus memory used for network buffers) for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.
-
-The default fraction for managed memory can be adjusted using the `taskmanager.memory.fraction` parameter. An absolute value may be set using `taskmanager.memory.size` (overrides the fraction parameter). If desired, the managed memory may be allocated outside the JVM heap. This may improve performance in setups with large memory sizes.
-
-- `taskmanager.memory.size`: The amount of memory (in megabytes) that the task manager reserves on-heap or off-heap (depending on `taskmanager.memory.off-heap`) for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio with respect to the size of the task manager JVM as specified by `taskmanager.memory.fraction`. (DEFAULT: -1)
-
-- `taskmanager.memory.fraction`: The relative amount of memory (with respect to `taskmanager.heap.mb`, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory (on-heap or off-heap depending on `taskmanager.memory.off-heap`) for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if `taskmanager.memory.size` is not set.
-
-- `taskmanager.memory.off-heap`: If set to `true`, the task manager allocates memory which is used for sorting, hash tables, and caching of intermediate results outside of the JVM heap. For setups with larger quantities of memory, this can improve the efficiency of the operations performed on the memory (DEFAULT: false).
-
-- `taskmanager.memory.segment-size`: The size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).
-
-- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`.  If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC. **Note:** For streaming setups, we highly recommend to set this value to `false` as the core state backends currently do not use the managed memory.
-
-### Memory and Performance Debugging
-
-These options are useful for debugging a Flink application for memory and garbage collection related issues, such as performance and out-of-memory process kills or exceptions.
-
-- `taskmanager.debug.memory.startLogThread`: Causes the TaskManagers to periodically log memory and Garbage collection statistics. The statistics include current heap-, off-heap, and other memory pool utilization, as well as the time spent on garbage collection, by heap memory pool.
-
-- `taskmanager.debug.memory.logIntervalMs`: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, if `taskmanager.debug.memory.startLogThread` is set to true.
-
-### Kerberos-based Security
-
-Flink supports Kerberos authentication for the following services:
-
-+ Hadoop Components, such as HDFS, YARN, or HBase *(version 2.6.1 and above; all other versions have critical bugs which might fail the Flink job unexpectedly)*.
-+ Kafka Connectors *(version 0.9+ and above)*.
-+ Zookeeper
-
-Configuring Flink for Kerberos security involves three aspects, explained separately in the following sub-sections.
-
-##### 1. Providing the cluster with a Kerberos credential (i.e. a keytab or a ticket via `kinit`)
-
-To provide the cluster with a Kerberos credential, Flink supports using a Kerberos keytab file or ticket caches managed by `kinit`.
-
-- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from your Kerberos ticket cache (default: `true`).
-
-- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
-
-- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
-
-If both `security.kerberos.login.keytab` and `security.kerberos.login.principal` have values provided, keytabs will be used for authentication.
-It is preferable to use keytabs for long-running jobs, to avoid ticket expiration issues.   If you prefer to use the ticket cache,
-talk to your administrator about increasing the Hadoop delegation token lifetime.
-
-Note that authentication using ticket caches is only supported when deploying Flink as a standalone cluster or on YARN.
-
-##### 2. Making the Kerberos credential available to components and connectors as needed
-
-For Hadoop components, Flink will automatically detect if the configured Kerberos credentials should be used when connecting to HDFS, HBase, and other Hadoop components depending on whether Hadoop security is enabled (in `core-site.xml`).
-
-For any connector or component that uses a JAAS configuration file, make the Kerberos credentials available to them by configuring JAAS login contexts for each one respectively, using the following configuration:
-
-- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication).
-
-This allows enabling Kerberos authentication for different connectors or components independently. For example, you can enable Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa.
-
-You may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html), whose entries will override those produced by the above configuration option.
-
-##### 3. Configuring the component and/or connector to use Kerberos authentication
-
-Finally, be sure to configure the connector within your Flink program or component as necessary to use Kerberos authentication.
-
-Below is a list of currently first-class supported connectors or components by Flink for Kerberos authentication:
-
-- Kafka: see [here]({{site.baseurl}}/dev/connectors/kafka.html#enabling-kerberos-authentication-for-versions-above-09-only) for details on configuring the Kafka connector to use Kerberos authentication.
-
-- Zookeeper (for HA): see [here]({{site.baseurl}}/ops/jobmanager_high_availability.html#configuring-for-zookeeper-security) for details on Zookeeper security configuration to work with the Kerberos-based security configurations mentioned here.
-
-For more information on how Flink security internally setups Kerberos authentication, please see [here]({{site.baseurl}}/ops/security-kerberos.html).
-
-### Other
-
-- `taskmanager.tmp.dirs`: The directory for temporary files, or a list of directories separated by the system's directory delimiter (for example ':' (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round-robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system's tmp dir).
-
-- `taskmanager.log.path`: The config parameter defining the taskmanager log file location
-
-- `jobmanager.web.address`: Address of the JobManager's web interface (DEFAULT: anyLocalAddress()).
-
-- `jobmanager.web.port`: Port of the JobManager's web interface (DEFAULT: 8081).
-
-- `jobmanager.web.tmpdir`: This configuration parameter allows defining the Flink web directory to be used by the web interface. The web interface
-will copy its static files into the directory. Also uploaded job jars are stored in the directory if not overridden. By default, the temporary directory is used.
-
-- `jobmanager.web.upload.dir`: The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory
-will be used under the directory specified by jobmanager.web.tmpdir.
-
-- `fs.overwrite-files`: Specifies whether file output writers should overwrite existing files by default. Set to *true* to overwrite by default, *false* otherwise. (DEFAULT: false)
-
-- `fs.output.always-create-directory`: File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to *true*, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to *false*, the writer will directly create the file directly at the output path, without creating a containing directory. (DEFAULT: false)
-
-- `taskmanager.network.memory.fraction`: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. (DEFAULT: 0.1)
-
-- `taskmanager.network.memory.min`: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB)
-
-- `taskmanager.network.memory.max`: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB)
-
-- `state.backend`: The backend that will be used to store operator state checkpoints if checkpointing is enabled. Supported backends:
-   -  `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
-   -  `filesystem`: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, ...
-
-- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups.
-
-- `state.backend.rocksdb.checkpointdir`:  The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example ':' (colon) on Linux/Unix). (DEFAULT value is `taskmanager.tmp.dirs`)
-
-- `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/ops/state/checkpoints.html#externalized-checkpoints).
-
-- `state.checkpoints.num-retained`: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1)
-
-- `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`.
-
-- `blob.storage.directory`: Directory for storing blobs (such as user JARs) on the TaskManagers.
-If not set or empty, Flink will fall back to `taskmanager.tmp.dirs` and select one temp directory
-at random.
-
-- `blob.service.cleanup.interval`: Cleanup interval (in seconds) of transient blobs at server and caches as well as permanent blobs at the caches (DEFAULT: 1 hour).
-Whenever a job is not referenced at the cache anymore, we set a TTL for its permanent blob files and
-let the periodic cleanup task (executed every `blob.service.cleanup.interval` seconds) remove them
-after this TTL has passed. We do the same for transient blob files at both server and caches but
-immediately after accessing them, i.e. an put or get operation.
-This means that a blob will be retained at most <tt>2 * `blob.service.cleanup.interval`</tt> seconds after
-not being referenced anymore (permanent blobs) or their last access (transient blobs). For permanent blobs,
-this means that a recovery still has the chance to use existing files rather downloading them again.
-
-- `blob.server.port`: Port definition for the blob server (serving user JARs) on the TaskManagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.
-
-- `blob.service.ssl.enabled`: Flag to enable ssl for the blob client/server communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true).
-
-- `restart-strategy`: Default [restart strategy]({{site.baseurl}}/dev/restart_strategies.html) to use in case no
-restart strategy has been specified for the job.
-The options are:
-    - fixed delay strategy: `fixed-delay`.
-    - failure rate strategy: `failure-rate`.
-    - no restarts: `none`
-
-    Default value is `none` unless checkpointing is enabled for the job in which case the default is `fixed-delay` with `Integer.MAX_VALUE` restart attempts and `10s` delay.
-
-- `restart-strategy.fixed-delay.attempts`: Number of restart attempts, used if the default restart strategy is set to "fixed-delay".
-Default value is 1, unless "fixed-delay" was activated by enabling checkpoints, in which case the default is `Integer.MAX_VALUE`.
-
-- `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default restart strategy is set to "fixed-delay". (default: `1 s`)
-
-- `restart-strategy.failure-rate.max-failures-per-interval`: Maximum number of restarts in given time interval before failing a job in "failure-rate" strategy.
-Default value is 1.
-
-- `restart-strategy.failure-rate.failure-rate-interval`: Time interval for measuring failure rate in "failure-rate" strategy.
-Default value is `1 minute`.
-
-- `restart-strategy.failure-rate.delay`: Delay between restart attempts, used if the default restart strategy is set to "failure-rate".
-Default value is the `akka.ask.timeout`.
-
-- `jobstore.cache-size`: The job store cache size in bytes which is used to keep completed jobs in memory (DEFAULT: `52428800` (`50` MB)).
-
-- `jobstore.expiration-time`: The time in seconds after which a completed job expires and is purged from the job store (DEFAULT: `3600`).
+{% include generated/common_section.html %}
 
 ## Full Reference
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eaff4da1/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
index 4e424c7..c193c4e 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
@@ -40,6 +40,25 @@ public final class Documentation {
 		String value();
 	}
 
+	/**
+	 * Annotation used on config option fields to include them in the "Common Options" section.
+	 *
+	 * <p>The {@link CommonOption#position()} argument controls the position in the generated table, with lower values
+	 * being placed at the top. Fields with the same position are sorted alphabetically by key.
+	 */
+	@Target(ElementType.FIELD)
+	@Retention(RetentionPolicy.RUNTIME)
+	@Internal
+	public @interface CommonOption {
+		int POSITION_MEMORY = 10;
+		int POSITION_PARALLELISM_SLOTS = 20;
+		int POSITION_FAULT_TOLERANCE = 30;
+		int POSITION_HIGH_AVAILABILITY = 40;
+		int POSITION_SECURITY = 50;
+
+		int position() default Integer.MAX_VALUE;
+	}
+
 	private Documentation(){
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eaff4da1/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index 201d11d..60b7613 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.configuration;
 
+import org.apache.flink.annotation.docs.Documentation;
+
 /**
  * A collection of all configuration options that relate to checkpoints
  * and savepoints.
@@ -29,6 +31,7 @@ public class CheckpointingOptions {
 	// ------------------------------------------------------------------------
 
 	/** The state backend to be used to store and checkpoint state. */
+	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_FAULT_TOLERANCE)
 	public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
 			.key("state.backend")
 			.noDefaultValue()
@@ -86,6 +89,7 @@ public class CheckpointingOptions {
 
 	/** The default directory for savepoints. Used by the state backends that write
 	 * savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */
+	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_FAULT_TOLERANCE)
 	public static final ConfigOption<String> SAVEPOINT_DIRECTORY = ConfigOptions
 			.key("state.savepoints.dir")
 			.noDefaultValue()
@@ -95,6 +99,7 @@ public class CheckpointingOptions {
 
 	/** The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem.
 	 * The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).*/
+	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_FAULT_TOLERANCE)
 	public static final ConfigOption<String> CHECKPOINTS_DIRECTORY = ConfigOptions
 			.key("state.checkpoints.dir")
 			.noDefaultValue()

http://git-wip-us.apache.org/repos/asf/flink/blob/eaff4da1/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 42c1c9c..656943f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -191,6 +191,7 @@ public class CoreOptions {
 	//  program
 	// ------------------------------------------------------------------------
 
+	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_PARALLELISM_SLOTS)
 	public static final ConfigOption<Integer> DEFAULT_PARALLELISM = ConfigOptions
 		.key("parallelism.default")
 		.defaultValue(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/eaff4da1/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index f9d4ad9..c8b8ae9 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -21,6 +21,7 @@ package org.apache.flink.configuration;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.ConfigGroup;
 import org.apache.flink.annotation.docs.ConfigGroups;
+import org.apache.flink.annotation.docs.Documentation;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -42,6 +43,7 @@ public class HighAvailabilityOptions {
 	 * A value of "NONE" signals no highly available setup.
 	 * To enable high-availability, set this mode to "ZOOKEEPER".
 	 */
+	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
 	public static final ConfigOption<String> HA_MODE =
 			key("high-availability")
 			.defaultValue("NONE")
@@ -63,6 +65,7 @@ public class HighAvailabilityOptions {
 	/**
 	 * File system path (URI) where Flink persists metadata in high-availability setups.
 	 */
+	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
 	public static final ConfigOption<String> HA_STORAGE_PATH =
 			key("high-availability.storageDir")
 			.noDefaultValue()

http://git-wip-us.apache.org/repos/asf/flink/blob/eaff4da1/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index add8e68..1ea6919 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -19,6 +19,7 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -75,6 +76,7 @@ public class JobManagerOptions {
 	/**
 	 * JVM heap size (in megabytes) for the JobManager.
 	 */
+	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
 	public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY =
 		key("jobmanager.heap.mb")
 		.defaultValue(1024)

http://git-wip-us.apache.org/repos/asf/flink/blob/eaff4da1/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 10c508b..feae587 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -21,6 +21,7 @@ package org.apache.flink.configuration;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.ConfigGroup;
 import org.apache.flink.annotation.docs.ConfigGroups;
+import org.apache.flink.annotation.docs.Documentation;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -86,6 +87,7 @@ public class SecurityOptions {
 	/**
 	 * Enable SSL support.
 	 */
+	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_SECURITY)
 	public static final ConfigOption<Boolean> SSL_ENABLED =
 		key("security.ssl.enabled")
 			.defaultValue(false)

http://git-wip-us.apache.org/repos/asf/flink/blob/eaff4da1/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 8017f7a..7cdda22 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -19,6 +19,7 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -35,6 +36,7 @@ public class TaskManagerOptions {
 	/**
 	 * JVM heap size (in megabytes) for the TaskManagers.
 	 */
+	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
 	public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY =
 			key("taskmanager.heap.mb")
 			.defaultValue(1024)
@@ -147,6 +149,7 @@ public class TaskManagerOptions {
 	/**
 	 * The config parameter defining the number of task slots of a task manager.
 	 */
+	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_PARALLELISM_SLOTS)
 	public static final ConfigOption<Integer> NUM_TASK_SLOTS =
 		key("taskmanager.numberOfTaskSlots")
 			.defaultValue(1)

http://git-wip-us.apache.org/repos/asf/flink/blob/eaff4da1/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index b73a40e..aca722e 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.util.function.ThrowingConsumer;
 
+import static org.apache.flink.docs.util.Utils.escapeCharacters;
+
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -43,8 +45,6 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.apache.flink.docs.util.Utils.escapeCharacters;
-
 /**
  * Class used for generating code based documentation of configuration parameters.
  */
@@ -58,6 +58,11 @@ public class ConfigOptionsDocGenerator {
 		new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
 	};
 
+	static final String DEFAULT_PATH_PREFIX = "src/main/java";
+
+	@VisibleForTesting
+	static final String COMMON_SECTION_FILE_NAME = "common_section.html";
+
 	private static final String CLASS_NAME_GROUP = "className";
 	private static final String CLASS_PREFIX_GROUP = "classPrefix";
 	private static final Pattern CLASS_NAME_PATTERN = Pattern.compile("(?<" + CLASS_NAME_GROUP + ">(?<" + CLASS_PREFIX_GROUP + ">[a-zA-Z]*)(?:Options|Config|Parameters))(?:\\.java)?");
@@ -69,6 +74,9 @@ public class ConfigOptionsDocGenerator {
 	 * the class is annotated with {@link ConfigGroups}. The tables contain the key, default value and description for
 	 * every {@link ConfigOption}.
 	 *
+	 * <p>One additional table is generated containing all {@link ConfigOption ConfigOptions} that are annotated with
+	 * {@link org.apache.flink.annotation.docs.Documentation.CommonOption}.
+	 *
 	 * @param args
 	 *  [0] output directory for the generated files
 	 *  [1] project root directory
@@ -78,12 +86,42 @@ public class ConfigOptionsDocGenerator {
 		String rootDir = args[1];
 
 		for (OptionsClassLocation location : LOCATIONS) {
-			createTable(rootDir, location.getModule(), location.getPackage(), outputDirectory);
+			createTable(rootDir, location.getModule(), location.getPackage(), outputDirectory, DEFAULT_PATH_PREFIX);
+		}
+
+		generateCommonSection(rootDir, outputDirectory, LOCATIONS, DEFAULT_PATH_PREFIX);
+	}
+
+	@VisibleForTesting
+	static void generateCommonSection(String rootDir, String outputDirectory, OptionsClassLocation[] locations, String pathPrefix) throws IOException, ClassNotFoundException {
+		List<OptionWithMetaInfo> commonOptions = new ArrayList<>(32);
+		for (OptionsClassLocation location : locations) {
+			commonOptions.addAll(findCommonOptions(rootDir, location.getModule(), location.getPackage(), pathPrefix));
 		}
+		commonOptions.sort((o1, o2) -> {
+			int position1 = o1.field.getAnnotation(Documentation.CommonOption.class).position();
+			int position2 = o2.field.getAnnotation(Documentation.CommonOption.class).position();
+			if (position1 == position2) {
+				return o1.option.key().compareTo(o2.option.key());
+			} else {
+				return Integer.compare(position1, position2);
+			}
+		});
+
+		String commonHtmlTable = toHtmlTable(commonOptions);
+		Files.write(Paths.get(outputDirectory, COMMON_SECTION_FILE_NAME), commonHtmlTable.getBytes(StandardCharsets.UTF_8));
+	}
+
+	private static Collection<OptionWithMetaInfo> findCommonOptions(String rootDir, String module, String packageName, String pathPrefix) throws IOException, ClassNotFoundException {
+		Collection<OptionWithMetaInfo> commonOptions = new ArrayList<>(32);
+		processConfigOptions(rootDir, module, packageName, pathPrefix, optionsClass -> extractConfigOptions(optionsClass).stream()
+			.filter(optionWithMetaInfo -> optionWithMetaInfo.field.getAnnotation(Documentation.CommonOption.class) != null)
+			.forEachOrdered(commonOptions::add));
+		return commonOptions;
 	}
 
-	private static void createTable(String rootDir, String module, String packageName, String outputDirectory) throws IOException, ClassNotFoundException {
-		processConfigOptions(rootDir, module, packageName, optionsClass -> {
+	private static void createTable(String rootDir, String module, String packageName, String outputDirectory, String pathPrefix) throws IOException, ClassNotFoundException {
+		processConfigOptions(rootDir, module, packageName, pathPrefix, optionsClass -> {
 			List<Tuple2<ConfigGroup, String>> tables = generateTablesForClass(optionsClass);
 			for (Tuple2<ConfigGroup, String> group : tables) {
 				String name;
@@ -103,8 +141,8 @@ public class ConfigOptionsDocGenerator {
 		});
 	}
 
-	static void processConfigOptions(String rootDir, String module, String packageName, ThrowingConsumer<Class<?>, IOException> classConsumer) throws IOException, ClassNotFoundException {
-		Path configDir = Paths.get(rootDir, module, "src/main/java", packageName.replaceAll("\\.", "/"));
+	static void processConfigOptions(String rootDir, String module, String packageName, String pathPrefix, ThrowingConsumer<Class<?>, IOException> classConsumer) throws IOException, ClassNotFoundException {
+		Path configDir = Paths.get(rootDir, module, pathPrefix, packageName.replaceAll("\\.", "/"));
 
 		try (DirectoryStream<Path> stream = Files.newDirectoryStream(configDir)) {
 			for (Path entry : stream) {

http://git-wip-us.apache.org/repos/asf/flink/blob/eaff4da1/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
index 4176c5d..26cba80 100644
--- a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
+++ b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
@@ -24,9 +24,16 @@ import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.docs.configuration.data.TestCommonOptions;
+import org.apache.flink.util.FileUtils;
 
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.List;
 
@@ -37,6 +44,9 @@ import static org.junit.Assert.assertEquals;
  */
 public class ConfigOptionsDocGeneratorTest {
 
+	@ClassRule
+	public static final TemporaryFolder TMP = new TemporaryFolder();
+
 	static class TestConfigGroup {
 		public static ConfigOption<Integer> firstOption = ConfigOptions
 			.key("first.option.a")
@@ -215,4 +225,42 @@ public class ConfigOptionsDocGeneratorTest {
 		assertEquals(expectedTable, htmlTable);
 	}
 
+	@Test
+	public void testCommonOptions() throws IOException, ClassNotFoundException {
+		final String projectRootDir = System.getProperty("rootDir");
+		final String outputDirectory = TMP.newFolder().getAbsolutePath();
+
+		final OptionsClassLocation[] locations = new OptionsClassLocation[] {
+			new OptionsClassLocation("flink-docs", TestCommonOptions.class.getPackage().getName())
+		};
+
+		ConfigOptionsDocGenerator.generateCommonSection(projectRootDir, outputDirectory, locations, "src/test/java");
+
+		String expected =
+			"<table class=\"table table-bordered\">\n" +
+			"    <thead>\n" +
+			"        <tr>\n" +
+			"            <th class=\"text-left\" style=\"width: 20%\">Key</th>\n" +
+			"            <th class=\"text-left\" style=\"width: 15%\">Default</th>\n" +
+			"            <th class=\"text-left\" style=\"width: 65%\">Description</th>\n" +
+			"        </tr>\n" +
+			"    </thead>\n" +
+			"    <tbody>\n" +
+			"        <tr>\n" +
+			"            <td><h5>" + TestCommonOptions.COMMON_POSITIONED_OPTION.key() + "</h5></td>\n" +
+			"            <td style=\"word-wrap: break-word;\">" + TestCommonOptions.COMMON_POSITIONED_OPTION.defaultValue() + "</td>\n" +
+			"            <td>" + TestCommonOptions.COMMON_POSITIONED_OPTION.description() + "</td>\n" +
+			"        </tr>\n" +
+			"        <tr>\n" +
+			"            <td><h5>" + TestCommonOptions.COMMON_OPTION.key() + "</h5></td>\n" +
+			"            <td style=\"word-wrap: break-word;\">" + TestCommonOptions.COMMON_OPTION.defaultValue() + "</td>\n" +
+			"            <td>" + TestCommonOptions.COMMON_OPTION.description() + "</td>\n" +
+			"        </tr>\n" +
+			"    </tbody>\n" +
+			"</table>\n";
+
+		String output = FileUtils.readFile(Paths.get(outputDirectory, ConfigOptionsDocGenerator.COMMON_SECTION_FILE_NAME).toFile(), StandardCharsets.UTF_8.name());
+
+		assertEquals(expected, output);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eaff4da1/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java
index 5b6b5b5..2d20212 100644
--- a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java
+++ b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.docs.configuration;
 
+import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.ConfigOption;
 
 import org.jsoup.Jsoup;
@@ -35,9 +36,12 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.flink.docs.configuration.ConfigOptionsDocGenerator.COMMON_SECTION_FILE_NAME;
+import static org.apache.flink.docs.configuration.ConfigOptionsDocGenerator.DEFAULT_PATH_PREFIX;
 import static org.apache.flink.docs.configuration.ConfigOptionsDocGenerator.LOCATIONS;
 import static org.apache.flink.docs.configuration.ConfigOptionsDocGenerator.extractConfigOptions;
 import static org.apache.flink.docs.configuration.ConfigOptionsDocGenerator.processConfigOptions;
@@ -52,10 +56,23 @@ import static org.apache.flink.docs.configuration.ConfigOptionsDocGenerator.stri
 public class ConfigOptionsDocsCompletenessITCase {
 
 	@Test
-	public void testDocsCompleteness() throws IOException, ClassNotFoundException {
+	public void testCommonSectionCompleteness() throws IOException, ClassNotFoundException {
+		Map<String, DocumentedOption> documentedOptions = parseDocumentedCommonOptions();
+		Map<String, ExistingOption> existingOptions = findExistingOptions(
+			optionWithMetaInfo -> optionWithMetaInfo.field.getAnnotation(Documentation.CommonOption.class) != null);
+
+		compareDocumentedAndExistingOptions(documentedOptions, existingOptions);
+	}
+
+	@Test
+	public void testFullReferenceCompleteness() throws IOException, ClassNotFoundException {
 		Map<String, DocumentedOption> documentedOptions = parseDocumentedOptions();
-		Map<String, ExistingOption> existingOptions = findExistingOptions();
+		Map<String, ExistingOption> existingOptions = findExistingOptions(ignored -> true);
+
+		compareDocumentedAndExistingOptions(documentedOptions, existingOptions);
+	}
 
+	private static void compareDocumentedAndExistingOptions(Map<String, DocumentedOption> documentedOptions, Map<String, ExistingOption> existingOptions) {
 		final Collection<String> problems = new ArrayList<>(0);
 
 		// first check that all existing options are properly documented
@@ -98,6 +115,27 @@ public class ConfigOptionsDocsCompletenessITCase {
 		}
 	}
 
+	private static Map<String, DocumentedOption> parseDocumentedCommonOptions() throws IOException {
+		Path commonSection = Paths.get(System.getProperty("rootDir"), "docs", "_includes", "generated", COMMON_SECTION_FILE_NAME);
+		return parseDocumentedOptionsFromFile(commonSection).stream()
+			.collect(Collectors.toMap(option -> option.key, option -> option, (option1, option2) -> {
+				if (option1.equals(option2)) {
+					// we allow multiple instances of ConfigOptions with the same key if they are identical
+					return option1;
+				} else {
+					// found a ConfigOption pair with the same key that aren't equal
+					// we fail here outright as this is not a documentation-completeness problem
+					if (!option1.defaultValue.equals(option2.defaultValue)) {
+						throw new AssertionError("Documentation contains distinct defaults for " +
+							option1.key + " in " + option1.containingFile + " and " + option2.containingFile + '.');
+					} else {
+						throw new AssertionError("Documentation contains distinct descriptions for " +
+							option1.key + " in " + option1.containingFile + " and " + option2.containingFile + '.');
+					}
+				}
+			}));
+	}
+
 	private static Map<String, DocumentedOption> parseDocumentedOptions() throws IOException {
 		Path includeFolder = Paths.get(System.getProperty("rootDir"), "docs", "_includes", "generated").toAbsolutePath();
 		return Files.list(includeFolder)
@@ -141,24 +179,26 @@ public class ConfigOptionsDocsCompletenessITCase {
 			.collect(Collectors.toList());
 	}
 
-	private static Map<String, ExistingOption> findExistingOptions() throws IOException, ClassNotFoundException {
+	private static Map<String, ExistingOption> findExistingOptions(Predicate<ConfigOptionsDocGenerator.OptionWithMetaInfo> predicate) throws IOException, ClassNotFoundException {
 		Map<String, ExistingOption> existingOptions = new HashMap<>(32);
 
 		for (OptionsClassLocation location : LOCATIONS) {
-			processConfigOptions(System.getProperty("rootDir"), location.getModule(), location.getPackage(), optionsClass -> {
+			processConfigOptions(System.getProperty("rootDir"), location.getModule(), location.getPackage(), DEFAULT_PATH_PREFIX, optionsClass -> {
 				List<ConfigOptionsDocGenerator.OptionWithMetaInfo> configOptions = extractConfigOptions(optionsClass);
 				for (ConfigOptionsDocGenerator.OptionWithMetaInfo option : configOptions) {
-					String key = option.option.key();
-					String defaultValue = stringifyDefault(option);
-					String description = option.option.description();
-					ExistingOption duplicate = existingOptions.put(key, new ExistingOption(key, defaultValue, description, optionsClass));
-					if (duplicate != null) {
-						// multiple documented options have the same key
-						// we fail here outright as this is not a documentation-completeness problem
-						if (!(duplicate.description.equals(description))) {
-							throw new AssertionError("Ambiguous option " + key + " due to distinct descriptions.");
-						} else if (!duplicate.defaultValue.equals(defaultValue)) {
-							throw new AssertionError("Ambiguous option " + key + " due to distinct default values (" + defaultValue + " vs " + duplicate.defaultValue + ").");
+					if (predicate.test(option)) {
+						String key = option.option.key();
+						String defaultValue = stringifyDefault(option);
+						String description = option.option.description();
+						ExistingOption duplicate = existingOptions.put(key, new ExistingOption(key, defaultValue, description, optionsClass));
+						if (duplicate != null) {
+							// multiple documented options have the same key
+							// we fail here outright as this is not a documentation-completeness problem
+							if (!(duplicate.description.equals(description))) {
+								throw new AssertionError("Ambiguous option " + key + " due to distinct descriptions.");
+							} else if (!duplicate.defaultValue.equals(defaultValue)) {
+								throw new AssertionError("Ambiguous option " + key + " due to distinct default values (" + defaultValue + " vs " + duplicate.defaultValue + ").");
+							}
 						}
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/eaff4da1/flink-docs/src/test/java/org/apache/flink/docs/configuration/data/TestCommonOptions.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/configuration/data/TestCommonOptions.java b/flink-docs/src/test/java/org/apache/flink/docs/configuration/data/TestCommonOptions.java
new file mode 100644
index 0000000..f0bf37c
--- /dev/null
+++ b/flink-docs/src/test/java/org/apache/flink/docs/configuration/data/TestCommonOptions.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.docs.configuration.data;
+
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Collection of test {@link ConfigOptions ConfigOptions}.
+ */
+public class TestCommonOptions {
+
+	@Documentation.CommonOption
+	public static final ConfigOption<Integer> COMMON_OPTION = ConfigOptions
+		.key("first.option.a")
+		.defaultValue(2)
+		.withDescription("This is the description for the common option.");
+
+	public static final ConfigOption<String> GENERIC_OPTION = ConfigOptions
+		.key("second.option.a")
+		.noDefaultValue()
+		.withDescription("This is the description for the generic option.");
+
+	@Documentation.CommonOption(position = 2)
+	public static final ConfigOption<Integer> COMMON_POSITIONED_OPTION = ConfigOptions
+		.key("third.option.a")
+		.defaultValue(3)
+		.withDescription("This is the description for the positioned common option.");
+}