You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/10/18 09:44:25 UTC

[flink] 01/02: [FLINK-13817][rest] Expose whether web submissions are enabled

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1692354fd05703da67b1d91d5db0a1f69922d8a4
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Aug 28 13:31:20 2019 +0200

    [FLINK-13817][rest] Expose whether web submissions are enabled
---
 docs/_includes/generated/rest_v1_dispatcher.html   |  9 +++
 .../runtime/webmonitor/history/HistoryServer.java  |  2 +-
 .../src/test/resources/rest_api_v1.snapshot        |  9 +++
 .../runtime/dispatcher/DispatcherRestEndpoint.java |  3 +-
 .../rest/handler/RestHandlerConfiguration.java     | 15 ++++-
 .../handler/cluster/DashboardConfigHandler.java    |  5 +-
 .../rest/messages/DashboardConfiguration.java      | 67 ++++++++++++++++++++--
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  3 +-
 .../rest/handler/RestHandlerConfigurationTest.java | 51 ++++++++++++++++
 .../rest/messages/DashboardConfigurationTest.java  |  3 +-
 10 files changed, 153 insertions(+), 14 deletions(-)

diff --git a/docs/_includes/generated/rest_v1_dispatcher.html b/docs/_includes/generated/rest_v1_dispatcher.html
index e3fd943..08d0f69 100644
--- a/docs/_includes/generated/rest_v1_dispatcher.html
+++ b/docs/_includes/generated/rest_v1_dispatcher.html
@@ -81,6 +81,15 @@
     },
     "flink-revision" : {
       "type" : "string"
+    },
+    "features" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:DashboardConfiguration:Features",
+      "properties" : {
+        "web-submit" : {
+          "type" : "boolean"
+        }
+      }
     }
   }
 }            </code>
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 37407bb..46bccb8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -277,7 +277,7 @@ public class HistoryServer {
 
 	private void createDashboardConfigFile() throws IOException {
 		try (FileWriter fw = createOrGetFile(webDir, "config")) {
-			fw.write(createConfigJson(DashboardConfiguration.from(webRefreshIntervalMillis, ZonedDateTime.now())));
+			fw.write(createConfigJson(DashboardConfiguration.from(webRefreshIntervalMillis, ZonedDateTime.now(), false)));
 			fw.flush();
 		} catch (IOException ioe) {
 			LOG.error("Failed to write config file.");
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index e7c4306..3fb4c59 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -48,6 +48,15 @@
         },
         "flink-revision" : {
           "type" : "string"
+        },
+        "features" : {
+          "type" : "object",
+          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:DashboardConfiguration:Features",
+          "properties" : {
+            "web-submit" : {
+              "type" : "boolean"
+            }
+          }
         }
       }
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 3d52331..0bbb10f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -94,7 +93,7 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 			executor,
 			clusterConfiguration);
 
-		if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
+		if (restConfiguration.isWebSubmitEnabled()) {
 			try {
 				webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
 					leaderRetriever,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index 336e3aa..b4221af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -38,11 +38,14 @@ public class RestHandlerConfiguration {
 
 	private final File webUiDir;
 
+	private final boolean webSubmitEnabled;
+
 	public RestHandlerConfiguration(
 			long refreshInterval,
 			int maxCheckpointStatisticCacheEntries,
 			Time timeout,
-			File webUiDir) {
+			File webUiDir,
+			boolean webSubmitEnabled) {
 		Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0.");
 		this.refreshInterval = refreshInterval;
 
@@ -50,6 +53,7 @@ public class RestHandlerConfiguration {
 
 		this.timeout = Preconditions.checkNotNull(timeout);
 		this.webUiDir = Preconditions.checkNotNull(webUiDir);
+		this.webSubmitEnabled = webSubmitEnabled;
 	}
 
 	public long getRefreshInterval() {
@@ -68,6 +72,10 @@ public class RestHandlerConfiguration {
 		return webUiDir;
 	}
 
+	public boolean isWebSubmitEnabled() {
+		return webSubmitEnabled;
+	}
+
 	public static RestHandlerConfiguration fromConfiguration(Configuration configuration) {
 		final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL);
 
@@ -78,10 +86,13 @@ public class RestHandlerConfiguration {
 		final String rootDir = "flink-web-ui";
 		final File webUiDir = new File(configuration.getString(WebOptions.TMP_DIR), rootDir);
 
+		final boolean webSubmitEnabled = configuration.getBoolean(WebOptions.SUBMIT_ENABLE);
+
 		return new RestHandlerConfiguration(
 			refreshInterval,
 			maxCheckpointStatisticCacheEntries,
 			timeout,
-			webUiDir);
+			webUiDir,
+			webSubmitEnabled);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
index a981420..312dc03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
@@ -46,10 +46,11 @@ public class DashboardConfigHandler extends AbstractRestHandler<RestfulGateway,
 			Time timeout,
 			Map<String, String> responseHeaders,
 			MessageHeaders<EmptyRequestBody, DashboardConfiguration, EmptyMessageParameters> messageHeaders,
-			long refreshInterval) {
+			long refreshInterval,
+			boolean webSubmitEnabled) {
 		super(leaderRetriever, timeout, responseHeaders, messageHeaders);
 
-		dashboardConfiguration = DashboardConfiguration.from(refreshInterval, ZonedDateTime.now());
+		dashboardConfiguration = DashboardConfiguration.from(refreshInterval, ZonedDateTime.now(), webSubmitEnabled);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
index 36f49c9..b95addf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.time.ZonedDateTime;
@@ -41,6 +42,9 @@ public class DashboardConfiguration implements ResponseBody {
 	public static final String FIELD_NAME_TIMEZONE_NAME = "timezone-name";
 	public static final String FIELD_NAME_FLINK_VERSION = "flink-version";
 	public static final String FIELD_NAME_FLINK_REVISION = "flink-revision";
+	public static final String FIELD_NAME_FLINK_FEATURES = "features";
+
+	public static final String FIELD_NAME_FEATURE_WEB_SUBMIT = "web-submit";
 
 	@JsonProperty(FIELD_NAME_REFRESH_INTERVAL)
 	private final long refreshInterval;
@@ -57,40 +61,91 @@ public class DashboardConfiguration implements ResponseBody {
 	@JsonProperty(FIELD_NAME_FLINK_REVISION)
 	private final String flinkRevision;
 
+	@JsonProperty(FIELD_NAME_FLINK_FEATURES)
+	private final Features features;
+
 	@JsonCreator
 	public DashboardConfiguration(
 			@JsonProperty(FIELD_NAME_REFRESH_INTERVAL) long refreshInterval,
 			@JsonProperty(FIELD_NAME_TIMEZONE_NAME) String timeZoneName,
 			@JsonProperty(FIELD_NAME_TIMEZONE_OFFSET) int timeZoneOffset,
 			@JsonProperty(FIELD_NAME_FLINK_VERSION) String flinkVersion,
-			@JsonProperty(FIELD_NAME_FLINK_REVISION) String flinkRevision) {
+			@JsonProperty(FIELD_NAME_FLINK_REVISION) String flinkRevision,
+			@JsonProperty(FIELD_NAME_FLINK_FEATURES) Features features) {
 		this.refreshInterval = refreshInterval;
 		this.timeZoneName = Preconditions.checkNotNull(timeZoneName);
 		this.timeZoneOffset = timeZoneOffset;
 		this.flinkVersion = Preconditions.checkNotNull(flinkVersion);
 		this.flinkRevision = Preconditions.checkNotNull(flinkRevision);
+		this.features = features;
 	}
 
+	@JsonIgnore
 	public long getRefreshInterval() {
 		return refreshInterval;
 	}
 
+	@JsonIgnore
 	public int getTimeZoneOffset() {
 		return timeZoneOffset;
 	}
 
+	@JsonIgnore
 	public String getTimeZoneName() {
 		return timeZoneName;
 	}
 
+	@JsonIgnore
 	public String getFlinkVersion() {
 		return flinkVersion;
 	}
 
+	@JsonIgnore
 	public String getFlinkRevision() {
 		return flinkRevision;
 	}
 
+	@JsonIgnore
+	public Features getFeatures() {
+		return features;
+	}
+
+	/**
+	 * Collection of features that are enabled/disabled.
+	 */
+	public static final class Features {
+
+		@JsonProperty(FIELD_NAME_FEATURE_WEB_SUBMIT)
+		private final boolean webSubmitEnabled;
+
+		@JsonCreator
+		public Features(@JsonProperty(FIELD_NAME_FEATURE_WEB_SUBMIT) boolean webSubmitEnabled) {
+			this.webSubmitEnabled = webSubmitEnabled;
+		}
+
+		@JsonIgnore
+		public boolean isWebSubmitEnabled() {
+			return webSubmitEnabled;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			Features features = (Features) o;
+			return webSubmitEnabled == features.webSubmitEnabled;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(webSubmitEnabled);
+		}
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -104,15 +159,16 @@ public class DashboardConfiguration implements ResponseBody {
 			timeZoneOffset == that.timeZoneOffset &&
 			Objects.equals(timeZoneName, that.timeZoneName) &&
 			Objects.equals(flinkVersion, that.flinkVersion) &&
-			Objects.equals(flinkRevision, that.flinkRevision);
+			Objects.equals(flinkRevision, that.flinkRevision) &&
+			Objects.equals(features, that.features);
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(refreshInterval, timeZoneName, timeZoneOffset, flinkVersion, flinkRevision);
+		return Objects.hash(refreshInterval, timeZoneName, timeZoneOffset, flinkVersion, flinkRevision, features);
 	}
 
-	public static DashboardConfiguration from(long refreshInterval, ZonedDateTime zonedDateTime) {
+	public static DashboardConfiguration from(long refreshInterval, ZonedDateTime zonedDateTime, boolean webSubmitEnabled) {
 
 		final String flinkVersion = EnvironmentInformation.getVersion();
 
@@ -131,6 +187,7 @@ public class DashboardConfiguration implements ResponseBody {
 			// convert zone date time into offset in order to not do the day light saving adaptions wrt the offset
 			zonedDateTime.toOffsetDateTime().getOffset().getTotalSeconds() * 1000,
 			flinkVersion,
-			flinkRevision);
+			flinkRevision,
+			new Features(webSubmitEnabled));
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 68b9c75..1c31edb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -215,7 +215,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			timeout,
 			responseHeaders,
 			DashboardConfigurationHeaders.getInstance(),
-			restConfiguration.getRefreshInterval());
+			restConfiguration.getRefreshInterval(),
+			restConfiguration.isWebSubmitEnabled());
 
 		JobIdsHandler jobIdsHandler = new JobIdsHandler(
 			leaderRetriever,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
new file mode 100644
index 0000000..4f389ab
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link RestHandlerConfiguration}.
+ */
+public class RestHandlerConfigurationTest extends TestLogger {
+
+	@Test
+	public void testWebSubmitFeatureFlagEnabled() {
+		testWebSubmitFeatureFlag(true);
+	}
+
+	@Test
+	public void testWebSubmitFeatureFlagDisabled() {
+		testWebSubmitFeatureFlag(false);
+	}
+
+	private static void testWebSubmitFeatureFlag(boolean webSubmitEnabled) {
+		final Configuration config = new Configuration();
+		config.setBoolean(WebOptions.SUBMIT_ENABLE, webSubmitEnabled);
+
+		RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(config);
+		assertEquals(webSubmitEnabled, restHandlerConfiguration.isWebSubmitEnabled());
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
index 789310e..5490ab0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
@@ -35,6 +35,7 @@ public class DashboardConfigurationTest extends RestResponseMarshallingTestBase<
 			"foobar",
 			42,
 			"version",
-			"revision");
+			"revision",
+			new DashboardConfiguration.Features(true));
 	}
 }