You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/21 11:09:14 UTC
[4/6] flink git commit: [FLINK-4770] [core] Introduce 'CoreOptions'
[FLINK-4770] [core] Introduce 'CoreOptions'
The CoreOptions should hold all essential configuration values that are not specific to
JobManager, TaskManager or any feature area, like HighAvailability or Security.
Examples for that are
- default java options
- default parallelism
- default state backend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4047965
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4047965
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4047965
Branch: refs/heads/master
Commit: a4047965898adc0ba7bf74280a3b22792ced3399
Parents: 544f534
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 17 14:57:20 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 21:29:16 2017 +0100
----------------------------------------------------------------------
.../connectors/fs/RollingSinkSecuredITCase.java | 3 +-
.../flink/configuration/ConfigConstants.java | 11 +++++-
.../apache/flink/configuration/CoreOptions.java | 40 ++++++++++++++++++++
.../org/apache/flink/hdfstests/HDFSTest.java | 4 +-
.../clusterframework/BootstrapTools.java | 3 +-
.../flink/runtime/blob/BlobRecoveryITCase.java | 4 +-
.../clusterframework/BootstrapToolsTest.java | 3 +-
.../BlobLibraryCacheRecoveryITCase.java | 4 +-
.../runtime/testutils/ZooKeeperTestUtils.java | 3 +-
.../streaming/runtime/tasks/StreamTask.java | 6 +--
...AlignedProcessingTimeWindowOperatorTest.java | 4 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 6 +--
.../test/checkpointing/RescalingITCase.java | 3 +-
.../test/checkpointing/SavepointITCase.java | 5 ++-
.../utils/SavepointMigrationTestBase.java | 3 +-
.../test/classloading/ClassLoaderITCase.java | 3 +-
.../flink/yarn/YARNHighAvailabilityITCase.java | 3 +-
.../yarn/AbstractYarnClusterDescriptor.java | 4 +-
.../flink/yarn/YarnClusterDescriptorTest.java | 3 +-
19 files changed, 86 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index fa46fc7..768ca5e 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.fs;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityUtils;
@@ -216,7 +217,7 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
- config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+ config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 44a78f9..5129f20 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -741,8 +741,11 @@ public final class ConfigConstants {
// ----------------------------- Streaming --------------------------------
/**
- * State backend for checkpoints;
+ * State backend for checkpoints
+ *
+ * @deprecated Use {@link CoreOptions#STATE_BACKEND} instead.
*/
+ @Deprecated
public static final String STATE_BACKEND = "state.backend";
// ----------------------------- Miscellaneous ----------------------------
@@ -756,7 +759,11 @@ public final class ConfigConstants {
*/
@Deprecated
public static final String FLINK_BASE_DIR_PATH_KEY = "flink.base.dir.path";
-
+
+ /**
+ * @deprecated Use {@link CoreOptions#FLINK_JVM_OPTIONS} instead.
+ */
+ @Deprecated
public static final String FLINK_JVM_OPTIONS = "env.java.opts";
// --------------------------- High Availability --------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
new file mode 100644
index 0000000..70e5f0b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+@PublicEvolving
+public class CoreOptions {
+
+ /**
+ *
+ */
+ public static final ConfigOption<String> FLINK_JVM_OPTIONS = ConfigOptions
+ .key("env.java.opts")
+ .defaultValue("");
+
+ public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = ConfigOptions
+ .key("parallelism.default")
+ .defaultValue(-1);
+
+ public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
+ .key("state.backend")
+ .noDefaultValue();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 49db0f8..75e666f 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -230,7 +230,7 @@ public class HDFSTest {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(ConfigConstants.STATE_BACKEND, "ZOOKEEPER");
+ config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
BlobRecoveryITCase.testBlobServerRecovery(config);
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index a0cf1d5..ebc9af8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -27,6 +27,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -358,7 +359,7 @@ public class BootstrapTools {
.put("jvmmem", "-Xms" + tmParams.taskManagerHeapSizeMB() + "m " +
"-Xmx" + tmParams.taskManagerHeapSizeMB() + "m " +
"-XX:MaxDirectMemorySize=" + tmParams.taskManagerDirectMemoryLimitMB() + "m");
- String javaOpts = flinkConfig.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+ String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
//applicable only for YarnMiniCluster secure test run
//krb5.conf file will be available as local resource in JM/TM container
if(hasKrb5) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index d043665..a7f792f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -55,7 +55,7 @@ public class BlobRecoveryITCase {
public void testBlobServerRecovery() throws Exception {
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+ config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath());
testBlobServerRecovery(config);
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index b08e1f4..1d100da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.junit.Test;
import java.util.HashMap;
@@ -209,7 +210,7 @@ public class BootstrapToolsTest {
true, true, true, this.getClass()));
// logback + log4j, with/out krb5, different JVM opts
- cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts);
+ cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
assertEquals(
java + " " + jvmmem +
" " + jvmOpts +
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index d3925be..7f75acc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.execution.librarycache;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobClient;
@@ -65,7 +65,7 @@ public class BlobLibraryCacheRecoveryITCase {
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+ config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 07cec32..42338cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.testutils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
@@ -80,7 +81,7 @@ public class ZooKeeperTestUtils {
config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
// File system state backend
- config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+ config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery");
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6b33d12..d734dc9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -694,7 +694,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
} else {
// see if we have a backend specified in the configuration
Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
- String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null);
+ String backendName = flinkConfig.getString(CoreOptions.STATE_BACKEND, null);
if (backendName == null) {
LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
@@ -731,7 +731,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
} catch (ClassCastException e) {
throw new IllegalConfigurationException("The class configured under '" +
- ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
+ CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
backendName + ')');
} catch (Throwable t) {
throw new IllegalConfigurationException("Cannot create configured state backend", t);
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 508d2e1..6f0e881 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -1074,7 +1074,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
private static StreamTask<?, ?> createMockTask() {
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
+ configuration.setString(CoreOptions.STATE_BACKEND, "jobmanager");
StreamTask<?, ?> task = mock(StreamTask.class);
when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6a63ee8..887ea4f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
@@ -192,7 +192,7 @@ public class StreamTaskTest extends TestLogger {
@Test
public void testStateBackendLoadingAndClosing() throws Exception {
Configuration taskManagerConfig = new Configuration();
- taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
+ taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName());
StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
@@ -216,7 +216,7 @@ public class StreamTaskTest extends TestLogger {
@Test
public void testStateBackendClosingOnFailure() throws Exception {
Configuration taskManagerConfig = new Configuration();
- taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
+ taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName());
StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 073632a..875d0ed 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -105,7 +106,7 @@ public class RescalingITCase extends TestLogger {
final File checkpointDir = temporaryFolder.newFolder();
final File savepointDir = temporaryFolder.newFolder();
- config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+ config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString());
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 77777d1..128522b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -168,7 +169,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
LOG.info("Created temporary savepoint directory: " + savepointDir + ".");
- config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+ config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
checkpointDir.toURI().toString());
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
@@ -701,7 +702,7 @@ public class SavepointITCase extends TestLogger {
fail("Test setup failed: failed to create temporary directories.");
}
- config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+ config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
checkpointDir.toURI().toString());
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 1a8a0a0..fced68c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -90,7 +91,7 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
LOG.info("Created savepoint directory: " + savepointDir + ".");
- config.setString(ConfigConstants.STATE_BACKEND, "memory");
+ config.setString(CoreOptions.STATE_BACKEND, "memory");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
config.setString("state.savepoints.dir", savepointDir.toURI().toString());
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index ca69e80..f25a302 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
@@ -97,7 +98,7 @@ public class ClassLoaderITCase extends TestLogger {
parallelism = 4;
// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
- config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+ config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index d959e14..546e3d7 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -118,7 +119,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
- "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
+ "@@" + CoreOptions.STATE_BACKEND + "=FILESYSTEM" +
"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 21599c1..edf57b3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.SecurityOptions;
@@ -1232,8 +1233,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// ------------------ Prepare Application Master Container ------------------------------
// respect custom JVM options in the YAML file
- String javaOpts =
- flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+ String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
//applicable only for YarnMiniCluster secure test run
//krb5.conf file will be available as local resource in JM/TM container
if (hasKrb5) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 70ccae8..ad3ebcd 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.yarn;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.fs.Path;
@@ -203,7 +204,7 @@ public class YarnClusterDescriptorTest {
.getCommands().get(0));
// logback + log4j, with/out krb5, different JVM opts
- cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts);
+ cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
assertEquals(
java + " " + jvmmem +
" " + jvmOpts +