You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/14 13:39:31 UTC
[7/9] flink git commit: Revert "[FLINK-6461] Deprecate web config
defaults in ConfigConstants"
Revert "[FLINK-6461] Deprecate web config defaults in ConfigConstants"
This reverts commit 9708550a40127f833728bcc85847035c3b5fbad8.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2baac899
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2baac899
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2baac899
Branch: refs/heads/release-1.3
Commit: 2baac899ff620888f41f11762aadf134a59ff548
Parents: cc6036f
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sun May 14 15:37:22 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 15:37:22 2017 +0200
----------------------------------------------------------------------
.../client/program/StandaloneClusterClient.java | 5 +-
.../flink/configuration/ConfigConstants.java | 65 +++-----------------
.../flink/configuration/JobManagerOptions.java | 7 ---
.../flink/api/java/ExecutionEnvironment.java | 4 ++
.../runtime/webmonitor/WebMonitorConfig.java | 36 ++++++++++-
.../runtime/webmonitor/WebRuntimeMonitor.java | 32 +++++++---
.../webmonitor/WebRuntimeMonitorITCase.java | 13 ++--
.../executiongraph/ExecutionGraphBuilder.java | 5 +-
.../runtime/webmonitor/WebMonitorUtils.java | 6 +-
.../flink/runtime/jobmanager/JobManager.scala | 3 +-
.../environment/StreamExecutionEnvironment.java | 4 ++
.../apache/flink/test/util/TestBaseUtils.java | 5 +-
.../flink/test/web/WebFrontendITCase.java | 5 +-
.../flink/yarn/YarnApplicationMasterRunner.java | 3 +-
14 files changed, 95 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 7517504..fd179c0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -18,8 +18,8 @@
package org.apache.flink.client.program;
import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -53,7 +53,8 @@ public class StandaloneClusterClient extends ClusterClient {
@Override
public String getWebInterfaceURL() {
String host = this.getJobManagerAddress().getHostString();
- int port = getFlinkConfiguration().getInteger(JobManagerOptions.WEB_PORT);
+ int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
return "http://" + host + ":" + port;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index c3704be..92e6b5d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1338,81 +1338,36 @@ public final class ConfigConstants {
key("jobmanager.web.address")
.noDefaultValue();
- /**
- * The config key for the port of the JobManager web frontend.
- * Setting this value to {@code -1} disables the web frontend.
- *
- * @deprecated use {@link JobManagerOptions#WEB_PORT} instead
- */
- @Deprecated
+ /** The config key for the port of the JobManager web frontend.
+ * Setting this value to {@code -1} disables the web frontend. */
public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
- /**
- * Default value to override SSL support for the JobManager web UI
- *
- * @deprecated use {@link JobManagerOptions#WEB_SSL_ENABLED} instead
- */
- @Deprecated
+ /** Default value to override SSL support for the JobManager web UI */
public static final boolean DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED = true;
- /**
- * The default number of archived jobs for the jobmanager
- *
- * @deprecated use {@link JobManagerOptions#WEB_ARCHIVE_COUNT} instead
- */
- @Deprecated
+ /** The default number of archived jobs for the jobmanager */
public static final int DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT = 5;
- /**
- * By default, submitting jobs from the web-frontend is allowed.
- *
- * @deprecated use {@link JobManagerOptions#WEB_SUBMIT_ENABLE} instead
- */
- @Deprecated
+ /** By default, submitting jobs from the web-frontend is allowed. */
public static final boolean DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED = true;
/** @deprecated Config key has been deprecated. Therefore, no default value required. */
@Deprecated
public static final boolean DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = false;
- /**
- * Default number of checkpoints to remember for recent history.
- *
- * @deprecated use {@link JobManagerOptions#WEB_CHECKPOINTS_HISTORY_SIZE} instead
- */
- @Deprecated
+ /** Default number of checkpoints to remember for recent history. */
public static final int DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = 10;
- /**
- * Time after which cached stats are cleaned up.
- *
- * @@deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_CLEANUP_INTERVAL} instead
- */
- @Deprecated
+ /** Time after which cached stats are cleaned up. */
public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = 10 * 60 * 1000;
- /**
- * Time after which available stats are deprecated and need to be refreshed (by resampling).
- *
- * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_REFRESH_INTERVAL} instead
- */
- @Deprecated
+ /** Time after which available stats are deprecated and need to be refreshed (by resampling). */
public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = 60 * 1000;
- /**
- * Number of samples to take to determine back pressure.
- *
- * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_NUM_SAMPLES} instead
- */
- @Deprecated
+ /** Number of samples to take to determine back pressure. */
public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = 100;
- /**
- * Delay between samples to determine back pressure.
- *
- * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_DELAY} instead
- */
- @Deprecated
+ /** Delay between samples to determine back pressure. */
public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = 50;
// ------------------------------ Akka Values ------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 76b6bed..b924e8e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -116,13 +116,6 @@ public class JobManagerOptions {
.defaultValue("*");
/**
- * The config parameter defining the refresh interval for the web-frontend.
- */
- public static final ConfigOption<Long> WEB_REFRESH_INTERVAL =
- key("jobmanager.web.refresh-interval")
- .defaultValue(3000L);
-
- /**
* Config parameter to override SSL support for the JobManager Web UI
*/
public static final ConfigOption<Boolean> WEB_SSL_ENABLED =
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 3d8a384..709ef09 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -1235,6 +1235,10 @@ public abstract class ExecutionEnvironment {
public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
checkNotNull(conf, "conf");
+ if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+ int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
+ conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
+ }
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
LocalEnvironment localEnv = new LocalEnvironment(conf);
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index 77537a2..dba2145 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -18,11 +18,39 @@
package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
public class WebMonitorConfig {
+ // ------------------------------------------------------------------------
+ // Config Keys
+ // ------------------------------------------------------------------------
+
+ /** The port for the runtime monitor web-frontend server. */
+ public static final String JOB_MANAGER_WEB_PORT_KEY = ConfigConstants.JOB_MANAGER_WEB_PORT_KEY;
+
+ /** The initial refresh interval for the web dashboard */
+ public static final String JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY = "jobmanager.web.refresh-interval";
+
+
+ // ------------------------------------------------------------------------
+ // Default values
+ // ------------------------------------------------------------------------
+
+ /** Default port for the web dashboard (= 8081) */
+ public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
+
+ /** Default refresh interval for the web dashboard (= 3000 msecs) */
+ public static final long DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL = 3000;
+
+
+ // ------------------------------------------------------------------------
+ // Config
+ // ------------------------------------------------------------------------
+
/** The configuration queried by this config object */
private final Configuration config;
@@ -39,15 +67,17 @@ public class WebMonitorConfig {
}
public int getWebFrontendPort() {
- return config.getInteger(JobManagerOptions.WEB_PORT);
+ return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
}
public long getRefreshInterval() {
- return config.getLong(JobManagerOptions.WEB_REFRESH_INTERVAL);
+ return config.getLong(JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY, DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL);
}
public boolean isProgramSubmitEnabled() {
- return config.getBoolean(JobManagerOptions.WEB_SUBMIT_ENABLE);
+ return config.getBoolean(
+ ConfigConstants.JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED);
}
public String getAllowOrigin() {
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 03b53ad..f83fa27 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -25,7 +25,6 @@ import io.netty.handler.codec.http.router.Router;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -192,13 +191,21 @@ public class WebRuntimeMonitor implements WebMonitor {
stackTraceSamples = new StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000);
// Back pressure stats tracker config
- int cleanUpInterval = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_CLEANUP_INTERVAL);
+ int cleanUpInterval = config.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL);
- int refreshInterval = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_REFRESH_INTERVAL);
+ int refreshInterval = config.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL);
- int numSamples = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_NUM_SAMPLES);
+ int numSamples = config.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES);
- int delay = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_DELAY);
+ int delay = config.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_DELAY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY);
Time delayBetweenSamples = Time.milliseconds(delay);
@@ -212,7 +219,10 @@ public class WebRuntimeMonitor implements WebMonitor {
ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService);
// Config to enable https access to the web-ui
- boolean enableSSL = config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config);
+ boolean enableSSL = config.getBoolean(
+ ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) &&
+ SSLUtils.getSSLEnabled(config);
if (enableSSL) {
LOG.info("Enabling ssl for the web frontend");
@@ -300,7 +310,9 @@ public class WebRuntimeMonitor implements WebMonitor {
// DELETE is the preferred way of stopping a job (Rest-conform)
DELETE(router, new JobStoppingHandler());
- int maxCachedEntries = config.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE);
+ int maxCachedEntries = config.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
// Register the checkpoint stats handlers
@@ -513,14 +525,14 @@ public class WebRuntimeMonitor implements WebMonitor {
}
private String getBaseDirStr(Configuration configuration) {
- return configuration.getString(JobManagerOptions.WEB_TMP_DIR);
+ return configuration.getString(ConfigConstants.JOB_MANAGER_WEB_TMPDIR_KEY, System.getProperty("java.io.tmpdir"));
}
private File getUploadDir(Configuration configuration) {
- File baseDir = new File(configuration.getString(JobManagerOptions.WEB_UPLOAD_DIR,
+ File baseDir = new File(configuration.getString(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY,
getBaseDirStr(configuration)));
- boolean uploadDirSpecified = configuration.contains(JobManagerOptions.WEB_UPLOAD_DIR);
+ boolean uploadDirSpecified = configuration.containsKey(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY);
return uploadDirSpecified ? baseDir : new File(baseDir, "flink-web-" + UUID.randomUUID());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index a51a234..5ccfe90 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -25,7 +25,6 @@ import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -138,8 +137,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());
- config.setInteger(JobManagerOptions.WEB_PORT, 0);
- config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());
+ config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+ config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
config,
@@ -287,8 +286,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
Files.createFile(new File(logDir, "jobmanager.out").toPath());
final Configuration config = new Configuration();
- config.setInteger(JobManagerOptions.WEB_PORT, 0);
- config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());
+ config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+ config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeper.getConnectString());
@@ -464,8 +463,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
// Web frontend on random port
Configuration config = new Configuration();
- config.setInteger(JobManagerOptions.WEB_PORT, 0);
- config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());
+ config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+ config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
config,
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 0e76cfb..de0d9d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -206,7 +205,9 @@ public class ExecutionGraphBuilder {
}
// Maximum number of remembered checkpoints
- int historySize = jobManagerConfig.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE);
+ int historySize = jobManagerConfig.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
historySize,
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index dd9527e..2baadb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -25,8 +25,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.net.URI;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -80,14 +80,14 @@ public final class WebMonitorUtils {
if (logFilePath == null) {
LOG.warn("Log file environment variable '{}' is not set.", logEnv);
- logFilePath = config.getString(JobManagerOptions.WEB_LOG_PATH);
+ logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
}
// not configured, cannot serve log files
if (logFilePath == null || logFilePath.length() < 4) {
LOG.warn("JobManager log files are unavailable in the web dashboard. " +
"Log file location not found in environment variable '{}' or configuration key '{}'.",
- logEnv, JobManagerOptions.WEB_LOG_PATH.key());
+ logEnv, ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY);
return new LogFileLocation(null, null);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 57a6415..5092643 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2532,7 +2532,8 @@ object JobManager {
val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration)
- val archiveCount = configuration.getInteger(JobManagerOptions.WEB_ARCHIVE_COUNT)
+ val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
val archiveDir = configuration.getString(JobManagerOptions.ARCHIVE_DIR)
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 97117d2..aad3a4b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1661,6 +1661,10 @@ public abstract class StreamExecutionEnvironment {
public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
checkNotNull(conf, "conf");
+ if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
+ int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
+ conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
+ }
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf);
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 437dd5f..f96ab3d 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.messages.TaskManagerMessages;
@@ -149,8 +148,8 @@ public class TestBaseUtils extends TestLogger {
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");
config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
- config.setInteger(JobManagerOptions.WEB_PORT, 8081);
- config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());
+ config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
+ config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 538ac98..003eb0c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -24,15 +24,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.math3.optim.nonlinear.vector.JacobianMultivariateVectorOptimizer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.JobManagerCliOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
@@ -87,7 +84,7 @@ public class WebFrontendITCase extends TestLogger {
Files.createFile(logFile.toPath());
Files.createFile(outFile.toPath());
- config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath());
+ config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.getAbsolutePath());
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
cluster = new LocalFlinkMiniCluster(config, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/2baac899/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 64417f6..b62f957 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -372,7 +372,8 @@ public class YarnApplicationMasterRunner {
LOG);
String protocol = "http://";
- if (config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) {
+ if (config.getBoolean(ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) {
protocol = "https://";
}
final String webMonitorURL = webMonitor == null ? null :