You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/21 20:37:59 UTC

[3/6] flink git commit: [FLINK-6144] [config] Port JobManager configuration options to ConfigOption

[FLINK-6144] [config] Port JobManager configuration options to ConfigOption

This PR ports the existing JobManager configuration options to the JobManagerOptions class
using the ConfigOption abstraction.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2ca1295
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2ca1295
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2ca1295

Branch: refs/heads/master
Commit: e2ca12957435e9ac3d811ebec611c4465525dea9
Parents: 0236992
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Mar 21 17:07:26 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 21 21:37:21 2017 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  66 ++++++++-
 .../flink/configuration/JobManagerOptions.java  | 142 +++++++++++++++++++
 .../executiongraph/ExecutionJobVertex.java      |   2 +-
 .../runtime/executiongraph/ExecutionVertex.java |   2 +-
 .../runtime/jobmanager/JobManagerOptions.java   |  38 -----
 5 files changed, 204 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index c7c8b1a..318c7e0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -117,13 +117,19 @@ public final class ConfigConstants {
 	/**
 	 * The config parameter defining the network address to connect to
 	 * for communication with the job manager.
+	 *
+	 * @deprecated Use {@link JobManagerOptions#ADDRESS} instead
 	 */
+	@Deprecated
 	public static final String JOB_MANAGER_IPC_ADDRESS_KEY = "jobmanager.rpc.address";
 
 	/**
 	 * The config parameter defining the network port to connect to
 	 * for communication with the job manager.
+	 *
+	 * @deprecated Use {@link JobManagerOptions#PORT} instead
 	 */
+	@Deprecated
 	public static final String JOB_MANAGER_IPC_PORT_KEY = "jobmanager.rpc.port";
 
 	/**
@@ -570,36 +576,59 @@ public final class ConfigConstants {
 
 	/**
 	 * The port for the runtime monitor web-frontend server.
+	 *
+	 * @deprecated Use {@link JobManagerOptions#WEB_PORT} instead.
 	 */
+	@Deprecated
 	public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port";
 
 	/**
 	 * Config parameter to override SSL support for the JobManager Web UI
+	 *
+	 * @deprecated Use {@link JobManagerOptions#WEB_SSL_ENABLED} instead.
 	 */
+	@Deprecated
 	public static final String JOB_MANAGER_WEB_SSL_ENABLED = "jobmanager.web.ssl.enabled";
 
 	/**
 	 * The config parameter defining the flink web directory to be used by the webmonitor.
+	 *
+	 * @deprecated Use {@link JobManagerOptions#WEB_TMP_DIR} instead.
 	 */
+	@Deprecated
 	public static final String JOB_MANAGER_WEB_TMPDIR_KEY = "jobmanager.web.tmpdir";
 
 	/**
 	 * The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory
 	 * will be used under the directory specified by JOB_MANAGER_WEB_TMPDIR_KEY.
+	 *
+	 * @deprecated Use {@link JobManagerOptions#WEB_UPLOAD_DIR} instead.
 	 */
+	@Deprecated
 	public static final String JOB_MANAGER_WEB_UPLOAD_DIR_KEY = "jobmanager.web.upload.dir";
 
 	/**
 	 * The config parameter defining the number of archived jobs for the jobmanager
+	 *
+	 * @deprecated Use {@link JobManagerOptions#WEB_ARCHIVE_COUNT} instead.
 	 */
+	@Deprecated
 	public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history";
 
 	/**
 	 * The log file location (may be in /log for standalone but under log directory when using YARN)
+	 *
+	 * @deprecated Use {@link JobManagerOptions#WEB_LOG_PATH} instead.
 	 */
+	@Deprecated
 	public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.log.path";
 
-	/** Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */
+	/**
+	 * Config parameter indicating whether jobs can be uploaded and run from the web-frontend.
+	 *
+	 * @deprecated Use {@link JobManagerOptions#WEB_SUBMIT_ENABLE} instead.
+	 */
+	@Deprecated
 	public static final String JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY = "jobmanager.web.submit.enable";
 
 	/**
@@ -610,19 +639,44 @@ public final class ConfigConstants {
 	@Deprecated
 	public static final String JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = "jobmanager.web.checkpoints.disable";
 
-	/** Config parameter defining the number of checkpoints to remember for recent history. */
+	/**
+	 * Config parameter defining the number of checkpoints to remember for recent history.
+	 *
+	 * @deprecated Use {@link JobManagerOptions#WEB_CHECKPOINTS_HISTORY_SIZE} instead.
+	 */
+	@Deprecated
 	public static final String JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = "jobmanager.web.checkpoints.history";
 
-	/** Time after which cached stats are cleaned up if not accessed. */
+	/**
+	 * Time after which cached stats are cleaned up if not accessed.
+	 *
+	 * @deprecated Use {@link JobManagerOptions#WEB_BACKPRESSURE_CLEANUP_INTERVAL} instead.
+	 */
+	@Deprecated
 	public static final String JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = "jobmanager.web.backpressure.cleanup-interval";
 
-	/** Time after which available stats are deprecated and need to be refreshed (by resampling). */
+	/**
+	 * Time after which available stats are deprecated and need to be refreshed (by resampling).
+	 *
+	 * @deprecated Use {@link JobManagerOptions#WEB_BACKPRESSURE_REFRESH_INTERVAL} instead.
+	 */
+	@Deprecated
 	public static final String JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = "jobmanager.web.backpressure.refresh-interval";
 
-	/** Number of stack trace samples to take to determine back pressure. */
+	/**
+	 * Number of stack trace samples to take to determine back pressure.
+	 *
+	 * @deprecated Use {@link JobManagerOptions#WEB_BACKPRESSURE_NUM_SAMPLES} instead.
+	 */
+	@Deprecated
 	public static final String JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = "jobmanager.web.backpressure.num-samples";
 
-	/** Delay between stack trace samples to determine back pressure. */
+	/**
+	 * Delay between stack trace samples to determine back pressure.
+	 *
+	 * @deprecated Use {@link JobManagerOptions#WEB_BACKPRESSURE_DELAY} instead.
+	 */
+	@Deprecated
 	public static final String JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = "jobmanager.web.backpressure.delay-between-samples";
 
 	// ------------------------------ AKKA ------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
new file mode 100644
index 0000000..2bc2498
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Configuration options for the JobManager.
+ */
+@PublicEvolving
+public class JobManagerOptions {
+
+	/**
+	 * The config parameter defining the network address to connect to
+	 * for communication with the job manager.
+	 */
+	public static final ConfigOption<String> ADDRESS = ConfigOptions
+		.key("jobmanager.rpc.address")
+		.noDefaultValue();
+
+	/**
+	 * The config parameter defining the network port to connect to
+	 * for communication with the job manager.
+	 */
+	public static final ConfigOption<Integer> PORT = ConfigOptions
+		.key("jobmanager.rpc.port")
+		.defaultValue(6123);
+
+	/**
+	 * The port for the runtime monitor web-frontend server.
+	 */
+	public static final ConfigOption<Integer> WEB_PORT = ConfigOptions
+		.key("jobmanager.web.port")
+		.defaultValue(8081);
+
+	/**
+	 * Config parameter to override SSL support for the JobManager Web UI
+	 */
+	public static final ConfigOption<Boolean> WEB_SSL_ENABLED = ConfigOptions
+		.key("jobmanager.web.ssl.enabled")
+		.defaultValue(true);
+
+	/**
+	 * The config parameter defining the flink web directory to be used by the webmonitor.
+	 */
+	public static final ConfigOption<String> WEB_TMP_DIR = ConfigOptions
+		.key("jobmanager.web.tmpdir")
+		.defaultValue(System.getProperty("java.io.tmpdir"));
+
+	/**
+	 * The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory
+	 * will be used under the directory specified by JOB_MANAGER_WEB_TMPDIR_KEY.
+	 */
+	public static final ConfigOption<String> WEB_UPLOAD_DIR = ConfigOptions
+		.key("jobmanager.web.upload.dir")
+		.noDefaultValue();
+
+	/**
+	 * The config parameter defining the number of archived jobs for the jobmanager.
+	 */
+	public static final ConfigOption<Integer> WEB_ARCHIVE_COUNT = ConfigOptions
+		.key("jobmanager.web.history")
+		.defaultValue(5);
+
+	/**
+	 * The log file location (may be in /log for standalone but under log directory when using YARN).
+	 */
+	public static final ConfigOption<String> WEB_LOG_PATH = ConfigOptions
+		.key("jobmanager.web.log.path")
+		.noDefaultValue();
+
+	/**
+	 * Config parameter indicating whether jobs can be uploaded and run from the web-frontend.
+	 */
+	public static final ConfigOption<Boolean> WEB_SUBMIT_ENABLE = ConfigOptions
+		.key("jobmanager.web.submit.enable")
+		.defaultValue(true);
+
+	/**
+	 * Config parameter defining the number of checkpoints to remember for recent history.
+	 */
+	public static final ConfigOption<Integer> WEB_CHECKPOINTS_HISTORY_SIZE = ConfigOptions
+		.key("jobmanager.web.checkpoints.history")
+		.defaultValue(10);
+
+	/**
+	 * Time after which cached stats are cleaned up if not accessed.
+	 */
+	public static final ConfigOption<Integer> WEB_BACKPRESSURE_CLEANUP_INTERVAL = ConfigOptions
+		.key("jobmanager.web.backpressure.cleanup-interval")
+		.defaultValue(10 * 60 * 1000);
+
+	/**
+	 * Time after which available stats are deprecated and need to be refreshed (by resampling).
+	 */
+	public static final ConfigOption<Integer> WEB_BACKPRESSURE_REFRESH_INTERVAL = ConfigOptions
+		.key("jobmanager.web.backpressure.refresh-interval")
+		.defaultValue(60 * 1000);
+
+	/**
+	 * Number of stack trace samples to take to determine back pressure.
+	 */
+	public static final ConfigOption<Integer> WEB_BACKPRESSURE_NUM_SAMPLES = ConfigOptions
+		.key("jobmanager.web.backpressure.num-samples")
+		.defaultValue(100);
+
+	/**
+	 * Delay between stack trace samples to determine back pressure.
+	 */
+	public static final ConfigOption<Integer> WEB_BACKPRESSURE_DELAY = ConfigOptions
+		.key("jobmanager.web.backpressure.delay-between-samples")
+		.defaultValue(50);
+
+	/**
+	 * The maximum number of prior execution attempts kept in history.
+	 */
+	public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE = ConfigOptions
+		.key("job-manager.max-attempts-history-size")
+		.defaultValue(16);
+
+	// ---------------------------------------------------------------------------------------------
+
+	private JobManagerOptions() {
+		throw new IllegalAccessError();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 545315f..c9b25bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -40,7 +40,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 9693b97..c7829fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -38,7 +38,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.state.TaskStateHandles;

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
deleted file mode 100644
index 279a70e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.ConfigOption;
-
-import static org.apache.flink.configuration.ConfigOptions.key;
-
-@PublicEvolving
-public class JobManagerOptions {
-
-	/**
-	 * The maximum number of prior execution attempts kept in history.
-	 */
-	public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE =
-			key("job-manager.max-attempts-history-size").defaultValue(16);
-
-	private JobManagerOptions() {
-		throw new IllegalAccessError();
-	}
-}