You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/21 11:09:14 UTC

[4/6] flink git commit: [FLINK-4770] [core] Introduce 'CoreOptions'

[FLINK-4770] [core] Introduce 'CoreOptions'

The CoreOptions should hold all essential configuration values that are not specific to
JobManager, TaskManager or any feature area, like HighAvailability or Security.

Examples for that are
  - default java options
  - default parallelism
  - default state backend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4047965
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4047965
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4047965

Branch: refs/heads/master
Commit: a4047965898adc0ba7bf74280a3b22792ced3399
Parents: 544f534
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 17 14:57:20 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 21:29:16 2017 +0100

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkSecuredITCase.java |  3 +-
 .../flink/configuration/ConfigConstants.java    | 11 +++++-
 .../apache/flink/configuration/CoreOptions.java | 40 ++++++++++++++++++++
 .../org/apache/flink/hdfstests/HDFSTest.java    |  4 +-
 .../clusterframework/BootstrapTools.java        |  3 +-
 .../flink/runtime/blob/BlobRecoveryITCase.java  |  4 +-
 .../clusterframework/BootstrapToolsTest.java    |  3 +-
 .../BlobLibraryCacheRecoveryITCase.java         |  4 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |  3 +-
 .../streaming/runtime/tasks/StreamTask.java     |  6 +--
 ...AlignedProcessingTimeWindowOperatorTest.java |  4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  6 +--
 .../test/checkpointing/RescalingITCase.java     |  3 +-
 .../test/checkpointing/SavepointITCase.java     |  5 ++-
 .../utils/SavepointMigrationTestBase.java       |  3 +-
 .../test/classloading/ClassLoaderITCase.java    |  3 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  3 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |  4 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   |  3 +-
 19 files changed, 86 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index fa46fc7..768ca5e 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -216,7 +217,7 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 			config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
 			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-			config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
 			config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
 			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
 			config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 44a78f9..5129f20 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -741,8 +741,11 @@ public final class ConfigConstants {
 	// ----------------------------- Streaming --------------------------------
 	
 	/**
-	 * State backend for checkpoints;
+	 * State backend for checkpoints
+	 * 
+	 * @deprecated Use {@link CoreOptions#STATE_BACKEND} instead.
 	 */
+	@Deprecated
 	public static final String STATE_BACKEND = "state.backend";
 	
 	// ----------------------------- Miscellaneous ----------------------------
@@ -756,7 +759,11 @@ public final class ConfigConstants {
 	 */
 	@Deprecated
 	public static final String FLINK_BASE_DIR_PATH_KEY = "flink.base.dir.path";
-	
+
+	/**
+	 * @deprecated Use {@link CoreOptions#FLINK_JVM_OPTIONS} instead.
+	 */
+	@Deprecated
 	public static final String FLINK_JVM_OPTIONS = "env.java.opts";
 
 	// --------------------------- High Availability --------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
new file mode 100644
index 0000000..70e5f0b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+@PublicEvolving
+public class CoreOptions {
+
+	/**
+	 * 
+	 */
+	public static final ConfigOption<String> FLINK_JVM_OPTIONS = ConfigOptions
+		.key("env.java.opts")
+		.defaultValue("");
+
+	public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = ConfigOptions
+		.key("parallelism.default")
+		.defaultValue(-1);
+	
+	public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
+		.key("state.backend")
+		.noDefaultValue();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 49db0f8..75e666f 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -230,7 +230,7 @@ public class HDFSTest {
 		org.apache.flink.configuration.Configuration
 			config = new org.apache.flink.configuration.Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(ConfigConstants.STATE_BACKEND, "ZOOKEEPER");
+		config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
 
 		BlobRecoveryITCase.testBlobServerRecovery(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index a0cf1d5..ebc9af8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -27,6 +27,7 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -358,7 +359,7 @@ public class BootstrapTools {
 			.put("jvmmem", 	"-Xms" + tmParams.taskManagerHeapSizeMB() + "m " +
 							"-Xmx" + tmParams.taskManagerHeapSizeMB() + "m " +
 							"-XX:MaxDirectMemorySize=" + tmParams.taskManagerDirectMemoryLimitMB() + "m");
-		String javaOpts = flinkConfig.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+		String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
 		//applicable only for YarnMiniCluster secure test run
 		//krb5.conf file will be available as local resource in JM/TM container
 		if(hasKrb5) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index d043665..a7f792f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -55,7 +55,7 @@ public class BlobRecoveryITCase {
 	public void testBlobServerRecovery() throws Exception {
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath());
 
 		testBlobServerRecovery(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index b08e1f4..1d100da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -209,7 +210,7 @@ public class BootstrapToolsTest {
 					true, true, true, this.getClass()));
 
 		// logback + log4j, with/out krb5, different JVM opts
-		cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts);
+		cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
 		assertEquals(
 			java + " " + jvmmem +
 				" " + jvmOpts +

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index d3925be..7f75acc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.execution.librarycache;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
@@ -65,7 +65,7 @@ public class BlobLibraryCacheRecoveryITCase {
 
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 07cec32..42338cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
@@ -80,7 +81,7 @@ public class ZooKeeperTestUtils {
 		config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
 
 		// File system state backend
-		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6b33d12..d734dc9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -694,7 +694,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		} else {
 			// see if we have a backend specified in the configuration
 			Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
-			String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null);
+			String backendName = flinkConfig.getString(CoreOptions.STATE_BACKEND, null);
 
 			if (backendName == null) {
 				LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
@@ -731,7 +731,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 						throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
 					} catch (ClassCastException e) {
 						throw new IllegalConfigurationException("The class configured under '" +
-								ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
+								CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
 								backendName + ')');
 					} catch (Throwable t) {
 						throw new IllegalConfigurationException("Cannot create configured state backend", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 508d2e1..6f0e881 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -1074,7 +1074,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	private static StreamTask<?, ?> createMockTask() {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
+		configuration.setString(CoreOptions.STATE_BACKEND, "jobmanager");
 
 		StreamTask<?, ?> task = mock(StreamTask.class);
 		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6a63ee8..887ea4f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -192,7 +192,7 @@ public class StreamTaskTest extends TestLogger {
 	@Test
 	public void testStateBackendLoadingAndClosing() throws Exception {
 		Configuration taskManagerConfig = new Configuration();
-		taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
+		taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName());
 
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
@@ -216,7 +216,7 @@ public class StreamTaskTest extends TestLogger {
 	@Test
 	public void testStateBackendClosingOnFailure() throws Exception {
 		Configuration taskManagerConfig = new Configuration();
-		taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
+		taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName());
 
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 073632a..875d0ed 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -105,7 +106,7 @@ public class RescalingITCase extends TestLogger {
 		final File checkpointDir = temporaryFolder.newFolder();
 		final File savepointDir = temporaryFolder.newFolder();
 
-		config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+		config.setString(CoreOptions.STATE_BACKEND, "filesystem");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
 		config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 77777d1..128522b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -168,7 +169,7 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
 			LOG.info("Created temporary savepoint directory: " + savepointDir + ".");
 
-			config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
 			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
 				checkpointDir.toURI().toString());
 			config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
@@ -701,7 +702,7 @@ public class SavepointITCase extends TestLogger {
 			fail("Test setup failed: failed to create temporary directories.");
 		}
 
-		config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+		config.setString(CoreOptions.STATE_BACKEND, "filesystem");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
 				checkpointDir.toURI().toString());
 		config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 1a8a0a0..fced68c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -90,7 +91,7 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
 		LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
 		LOG.info("Created savepoint directory: " + savepointDir + ".");
 
-		config.setString(ConfigConstants.STATE_BACKEND, "memory");
+		config.setString(CoreOptions.STATE_BACKEND, "memory");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
 		config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
 		config.setString("state.savepoints.dir", savepointDir.toURI().toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index ca69e80..f25a302 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -97,7 +98,7 @@ public class ClassLoaderITCase extends TestLogger {
 		parallelism = 4;
 
 		// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
-		config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+		config.setString(CoreOptions.STATE_BACKEND, "filesystem");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
 				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index d959e14..546e3d7 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -118,7 +119,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
 		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
 			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
-			"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
+			"@@" + CoreOptions.STATE_BACKEND + "=FILESYSTEM" +
 			"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
 			"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
 		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 21599c1..edf57b3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.SecurityOptions;
@@ -1232,8 +1233,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// ------------------ Prepare Application Master Container  ------------------------------
 
 		// respect custom JVM options in the YAML file
-		String javaOpts =
-			flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+		String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
 		//applicable only for YarnMiniCluster secure test run
 		//krb5.conf file will be available as local resource in JM/TM container
 		if (hasKrb5) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 70ccae8..ad3ebcd 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.fs.Path;
@@ -203,7 +204,7 @@ public class YarnClusterDescriptorTest {
 				.getCommands().get(0));
 
 		// logback + log4j, with/out krb5, different JVM opts
-		cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts);
+		cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
 		assertEquals(
 			java + " " + jvmmem +
 				" " + jvmOpts +