You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/21 20:37:59 UTC
[3/6] flink git commit: [FLINK-6144] [config] Port JobManager
configuration options to ConfigOption
[FLINK-6144] [config] Port JobManager configuration options to ConfigOption
This PR ports the existing JobManager configuration options to the JobManagerOptions class
using the ConfigOption abstraction.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2ca1295
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2ca1295
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2ca1295
Branch: refs/heads/master
Commit: e2ca12957435e9ac3d811ebec611c4465525dea9
Parents: 0236992
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Mar 21 17:07:26 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 21 21:37:21 2017 +0100
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 66 ++++++++-
.../flink/configuration/JobManagerOptions.java | 142 +++++++++++++++++++
.../executiongraph/ExecutionJobVertex.java | 2 +-
.../runtime/executiongraph/ExecutionVertex.java | 2 +-
.../runtime/jobmanager/JobManagerOptions.java | 38 -----
5 files changed, 204 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index c7c8b1a..318c7e0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -117,13 +117,19 @@ public final class ConfigConstants {
/**
* The config parameter defining the network address to connect to
* for communication with the job manager.
+ *
+ * @deprecated Use {@link JobManagerOptions#ADDRESS} instead
*/
+ @Deprecated
public static final String JOB_MANAGER_IPC_ADDRESS_KEY = "jobmanager.rpc.address";
/**
* The config parameter defining the network port to connect to
* for communication with the job manager.
+ *
+ * @deprecated Use {@link JobManagerOptions#PORT} instead
*/
+ @Deprecated
public static final String JOB_MANAGER_IPC_PORT_KEY = "jobmanager.rpc.port";
/**
@@ -570,36 +576,59 @@ public final class ConfigConstants {
/**
* The port for the runtime monitor web-frontend server.
+ *
+ * @deprecated Use {@link JobManagerOptions#WEB_PORT} instead.
*/
+ @Deprecated
public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port";
/**
* Config parameter to override SSL support for the JobManager Web UI
+ *
+ * @deprecated Use {@link JobManagerOptions#WEB_SSL_ENABLED} instead.
*/
+ @Deprecated
public static final String JOB_MANAGER_WEB_SSL_ENABLED = "jobmanager.web.ssl.enabled";
/**
* The config parameter defining the flink web directory to be used by the webmonitor.
+ *
+ * @deprecated Use {@link JobManagerOptions#WEB_TMP_DIR} instead.
*/
+ @Deprecated
public static final String JOB_MANAGER_WEB_TMPDIR_KEY = "jobmanager.web.tmpdir";
/**
* The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory
* will be used under the directory specified by JOB_MANAGER_WEB_TMPDIR_KEY.
+ *
+ * @deprecated Use {@link JobManagerOptions#WEB_UPLOAD_DIR} instead.
*/
+ @Deprecated
public static final String JOB_MANAGER_WEB_UPLOAD_DIR_KEY = "jobmanager.web.upload.dir";
/**
* The config parameter defining the number of archived jobs for the jobmanager
+ *
+ * @deprecated Use {@link JobManagerOptions#WEB_ARCHIVE_COUNT} instead.
*/
+ @Deprecated
public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history";
/**
* The log file location (may be in /log for standalone but under log directory when using YARN)
+ *
+ * @deprecated Use {@link JobManagerOptions#WEB_LOG_PATH} instead.
*/
+ @Deprecated
public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.log.path";
- /** Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */
+ /**
+ * Config parameter indicating whether jobs can be uploaded and run from the web-frontend.
+ *
+ * @deprecated Use {@link JobManagerOptions#WEB_SUBMIT_ENABLE} instead.
+ */
+ @Deprecated
public static final String JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY = "jobmanager.web.submit.enable";
/**
@@ -610,19 +639,44 @@ public final class ConfigConstants {
@Deprecated
public static final String JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = "jobmanager.web.checkpoints.disable";
- /** Config parameter defining the number of checkpoints to remember for recent history. */
+ /**
+ * Config parameter defining the number of checkpoints to remember for recent history.
+ *
+ * @deprecated Use {@link JobManagerOptions#WEB_CHECKPOINTS_HISTORY_SIZE} instead.
+ */
+ @Deprecated
public static final String JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = "jobmanager.web.checkpoints.history";
- /** Time after which cached stats are cleaned up if not accessed. */
+ /**
+ * Time after which cached stats are cleaned up if not accessed.
+ *
+ * @deprecated Use {@link JobManagerOptions#WEB_BACKPRESSURE_CLEANUP_INTERVAL} instead.
+ */
+ @Deprecated
public static final String JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = "jobmanager.web.backpressure.cleanup-interval";
- /** Time after which available stats are deprecated and need to be refreshed (by resampling). */
+ /**
+ * Time after which available stats are deprecated and need to be refreshed (by resampling).
+ *
+ * @deprecated Use {@link JobManagerOptions#WEB_BACKPRESSURE_REFRESH_INTERVAL} instead.
+ */
+ @Deprecated
public static final String JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = "jobmanager.web.backpressure.refresh-interval";
- /** Number of stack trace samples to take to determine back pressure. */
+ /**
+ * Number of stack trace samples to take to determine back pressure.
+ *
+ * @deprecated Use {@link JobManagerOptions#WEB_BACKPRESSURE_NUM_SAMPLES} instead.
+ */
+ @Deprecated
public static final String JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = "jobmanager.web.backpressure.num-samples";
- /** Delay between stack trace samples to determine back pressure. */
+ /**
+ * Delay between stack trace samples to determine back pressure.
+ *
+ * @deprecated Use {@link JobManagerOptions#WEB_BACKPRESSURE_DELAY} instead.
+ */
+ @Deprecated
public static final String JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = "jobmanager.web.backpressure.delay-between-samples";
// ------------------------------ AKKA ------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
new file mode 100644
index 0000000..2bc2498
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Configuration options for the JobManager.
+ */
+@PublicEvolving
+public class JobManagerOptions {
+
+ /**
+ * The config parameter defining the network address to connect to
+ * for communication with the job manager.
+ */
+ public static final ConfigOption<String> ADDRESS = ConfigOptions
+ .key("jobmanager.rpc.address")
+ .noDefaultValue();
+
+ /**
+ * The config parameter defining the network port to connect to
+ * for communication with the job manager.
+ */
+ public static final ConfigOption<Integer> PORT = ConfigOptions
+ .key("jobmanager.rpc.port")
+ .defaultValue(6123);
+
+ /**
+ * The port for the runtime monitor web-frontend server.
+ */
+ public static final ConfigOption<Integer> WEB_PORT = ConfigOptions
+ .key("jobmanager.web.port")
+ .defaultValue(8081);
+
+ /**
+ * Config parameter to override SSL support for the JobManager Web UI
+ */
+ public static final ConfigOption<Boolean> WEB_SSL_ENABLED = ConfigOptions
+ .key("jobmanager.web.ssl.enabled")
+ .defaultValue(true);
+
+ /**
+ * The config parameter defining the flink web directory to be used by the webmonitor.
+ */
+ public static final ConfigOption<String> WEB_TMP_DIR = ConfigOptions
+ .key("jobmanager.web.tmpdir")
+ .defaultValue(System.getProperty("java.io.tmpdir"));
+
+ /**
+ * The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory
+ * will be used under the directory specified by JOB_MANAGER_WEB_TMPDIR_KEY.
+ */
+ public static final ConfigOption<String> WEB_UPLOAD_DIR = ConfigOptions
+ .key("jobmanager.web.upload.dir")
+ .noDefaultValue();
+
+ /**
+ * The config parameter defining the number of archived jobs for the jobmanager.
+ */
+ public static final ConfigOption<Integer> WEB_ARCHIVE_COUNT = ConfigOptions
+ .key("jobmanager.web.history")
+ .defaultValue(5);
+
+ /**
+ * The log file location (may be in /log for standalone but under log directory when using YARN).
+ */
+ public static final ConfigOption<String> WEB_LOG_PATH = ConfigOptions
+ .key("jobmanager.web.log.path")
+ .noDefaultValue();
+
+ /**
+ * Config parameter indicating whether jobs can be uploaded and run from the web-frontend.
+ */
+ public static final ConfigOption<Boolean> WEB_SUBMIT_ENABLE = ConfigOptions
+ .key("jobmanager.web.submit.enable")
+ .defaultValue(true);
+
+ /**
+ * Config parameter defining the number of checkpoints to remember for recent history.
+ */
+ public static final ConfigOption<Integer> WEB_CHECKPOINTS_HISTORY_SIZE = ConfigOptions
+ .key("jobmanager.web.checkpoints.history")
+ .defaultValue(10);
+
+ /**
+ * Time after which cached stats are cleaned up if not accessed.
+ */
+ public static final ConfigOption<Integer> WEB_BACKPRESSURE_CLEANUP_INTERVAL = ConfigOptions
+ .key("jobmanager.web.backpressure.cleanup-interval")
+ .defaultValue(10 * 60 * 1000);
+
+ /**
+ * Time after which available stats are deprecated and need to be refreshed (by resampling).
+ */
+ public static final ConfigOption<Integer> WEB_BACKPRESSURE_REFRESH_INTERVAL = ConfigOptions
+ .key("jobmanager.web.backpressure.refresh-interval")
+ .defaultValue(60 * 1000);
+
+ /**
+ * Number of stack trace samples to take to determine back pressure.
+ */
+ public static final ConfigOption<Integer> WEB_BACKPRESSURE_NUM_SAMPLES = ConfigOptions
+ .key("jobmanager.web.backpressure.num-samples")
+ .defaultValue(100);
+
+ /**
+ * Delay between stack trace samples to determine back pressure.
+ */
+ public static final ConfigOption<Integer> WEB_BACKPRESSURE_DELAY = ConfigOptions
+ .key("jobmanager.web.backpressure.delay-between-samples")
+ .defaultValue(50);
+
+ /**
+ * The maximum number of prior execution attempts kept in history.
+ */
+ public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE = ConfigOptions
+ .key("job-manager.max-attempts-history-size")
+ .defaultValue(16);
+
+ // ---------------------------------------------------------------------------------------------
+
+ private JobManagerOptions() {
+ throw new IllegalAccessError();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 545315f..c9b25bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -40,7 +40,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 9693b97..c7829fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -38,7 +38,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.state.TaskStateHandles;
http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
deleted file mode 100644
index 279a70e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.ConfigOption;
-
-import static org.apache.flink.configuration.ConfigOptions.key;
-
-@PublicEvolving
-public class JobManagerOptions {
-
- /**
- * The maximum number of prior execution attempts kept in history.
- */
- public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE =
- key("job-manager.max-attempts-history-size").defaultValue(16);
-
- private JobManagerOptions() {
- throw new IllegalAccessError();
- }
-}