You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/10/18 09:44:25 UTC
[flink] 01/02: [FLINK-13817][rest] Expose whether web submissions
are enabled
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1692354fd05703da67b1d91d5db0a1f69922d8a4
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Aug 28 13:31:20 2019 +0200
[FLINK-13817][rest] Expose whether web submissions are enabled
---
docs/_includes/generated/rest_v1_dispatcher.html | 9 +++
.../runtime/webmonitor/history/HistoryServer.java | 2 +-
.../src/test/resources/rest_api_v1.snapshot | 9 +++
.../runtime/dispatcher/DispatcherRestEndpoint.java | 3 +-
.../rest/handler/RestHandlerConfiguration.java | 15 ++++-
.../handler/cluster/DashboardConfigHandler.java | 5 +-
.../rest/messages/DashboardConfiguration.java | 67 ++++++++++++++++++++--
.../runtime/webmonitor/WebMonitorEndpoint.java | 3 +-
.../rest/handler/RestHandlerConfigurationTest.java | 51 ++++++++++++++++
.../rest/messages/DashboardConfigurationTest.java | 3 +-
10 files changed, 153 insertions(+), 14 deletions(-)
diff --git a/docs/_includes/generated/rest_v1_dispatcher.html b/docs/_includes/generated/rest_v1_dispatcher.html
index e3fd943..08d0f69 100644
--- a/docs/_includes/generated/rest_v1_dispatcher.html
+++ b/docs/_includes/generated/rest_v1_dispatcher.html
@@ -81,6 +81,15 @@
},
"flink-revision" : {
"type" : "string"
+ },
+ "features" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:DashboardConfiguration:Features",
+ "properties" : {
+ "web-submit" : {
+ "type" : "boolean"
+ }
+ }
}
}
} </code>
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 37407bb..46bccb8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -277,7 +277,7 @@ public class HistoryServer {
private void createDashboardConfigFile() throws IOException {
try (FileWriter fw = createOrGetFile(webDir, "config")) {
- fw.write(createConfigJson(DashboardConfiguration.from(webRefreshIntervalMillis, ZonedDateTime.now())));
+ fw.write(createConfigJson(DashboardConfiguration.from(webRefreshIntervalMillis, ZonedDateTime.now(), false)));
fw.flush();
} catch (IOException ioe) {
LOG.error("Failed to write config file.");
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index e7c4306..3fb4c59 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -48,6 +48,15 @@
},
"flink-revision" : {
"type" : "string"
+ },
+ "features" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:DashboardConfiguration:Features",
+ "properties" : {
+ "web-submit" : {
+ "type" : "boolean"
+ }
+ }
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 3d52331..0bbb10f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -94,7 +93,7 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
executor,
clusterConfiguration);
- if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
+ if (restConfiguration.isWebSubmitEnabled()) {
try {
webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
leaderRetriever,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index 336e3aa..b4221af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -38,11 +38,14 @@ public class RestHandlerConfiguration {
private final File webUiDir;
+ private final boolean webSubmitEnabled;
+
public RestHandlerConfiguration(
long refreshInterval,
int maxCheckpointStatisticCacheEntries,
Time timeout,
- File webUiDir) {
+ File webUiDir,
+ boolean webSubmitEnabled) {
Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0.");
this.refreshInterval = refreshInterval;
@@ -50,6 +53,7 @@ public class RestHandlerConfiguration {
this.timeout = Preconditions.checkNotNull(timeout);
this.webUiDir = Preconditions.checkNotNull(webUiDir);
+ this.webSubmitEnabled = webSubmitEnabled;
}
public long getRefreshInterval() {
@@ -68,6 +72,10 @@ public class RestHandlerConfiguration {
return webUiDir;
}
+ public boolean isWebSubmitEnabled() {
+ return webSubmitEnabled;
+ }
+
public static RestHandlerConfiguration fromConfiguration(Configuration configuration) {
final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL);
@@ -78,10 +86,13 @@ public class RestHandlerConfiguration {
final String rootDir = "flink-web-ui";
final File webUiDir = new File(configuration.getString(WebOptions.TMP_DIR), rootDir);
+ final boolean webSubmitEnabled = configuration.getBoolean(WebOptions.SUBMIT_ENABLE);
+
return new RestHandlerConfiguration(
refreshInterval,
maxCheckpointStatisticCacheEntries,
timeout,
- webUiDir);
+ webUiDir,
+ webSubmitEnabled);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
index a981420..312dc03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
@@ -46,10 +46,11 @@ public class DashboardConfigHandler extends AbstractRestHandler<RestfulGateway,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, DashboardConfiguration, EmptyMessageParameters> messageHeaders,
- long refreshInterval) {
+ long refreshInterval,
+ boolean webSubmitEnabled) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders);
- dashboardConfiguration = DashboardConfiguration.from(refreshInterval, ZonedDateTime.now());
+ dashboardConfiguration = DashboardConfiguration.from(refreshInterval, ZonedDateTime.now(), webSubmitEnabled);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
index 36f49c9..b95addf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import java.time.ZonedDateTime;
@@ -41,6 +42,9 @@ public class DashboardConfiguration implements ResponseBody {
public static final String FIELD_NAME_TIMEZONE_NAME = "timezone-name";
public static final String FIELD_NAME_FLINK_VERSION = "flink-version";
public static final String FIELD_NAME_FLINK_REVISION = "flink-revision";
+ public static final String FIELD_NAME_FLINK_FEATURES = "features";
+
+ public static final String FIELD_NAME_FEATURE_WEB_SUBMIT = "web-submit";
@JsonProperty(FIELD_NAME_REFRESH_INTERVAL)
private final long refreshInterval;
@@ -57,40 +61,91 @@ public class DashboardConfiguration implements ResponseBody {
@JsonProperty(FIELD_NAME_FLINK_REVISION)
private final String flinkRevision;
+ @JsonProperty(FIELD_NAME_FLINK_FEATURES)
+ private final Features features;
+
@JsonCreator
public DashboardConfiguration(
@JsonProperty(FIELD_NAME_REFRESH_INTERVAL) long refreshInterval,
@JsonProperty(FIELD_NAME_TIMEZONE_NAME) String timeZoneName,
@JsonProperty(FIELD_NAME_TIMEZONE_OFFSET) int timeZoneOffset,
@JsonProperty(FIELD_NAME_FLINK_VERSION) String flinkVersion,
- @JsonProperty(FIELD_NAME_FLINK_REVISION) String flinkRevision) {
+ @JsonProperty(FIELD_NAME_FLINK_REVISION) String flinkRevision,
+ @JsonProperty(FIELD_NAME_FLINK_FEATURES) Features features) {
this.refreshInterval = refreshInterval;
this.timeZoneName = Preconditions.checkNotNull(timeZoneName);
this.timeZoneOffset = timeZoneOffset;
this.flinkVersion = Preconditions.checkNotNull(flinkVersion);
this.flinkRevision = Preconditions.checkNotNull(flinkRevision);
+ this.features = features;
}
+ @JsonIgnore
public long getRefreshInterval() {
return refreshInterval;
}
+ @JsonIgnore
public int getTimeZoneOffset() {
return timeZoneOffset;
}
+ @JsonIgnore
public String getTimeZoneName() {
return timeZoneName;
}
+ @JsonIgnore
public String getFlinkVersion() {
return flinkVersion;
}
+ @JsonIgnore
public String getFlinkRevision() {
return flinkRevision;
}
+ @JsonIgnore
+ public Features getFeatures() {
+ return features;
+ }
+
+ /**
+ * Collection of features that are enabled/disabled.
+ */
+ public static final class Features {
+
+ @JsonProperty(FIELD_NAME_FEATURE_WEB_SUBMIT)
+ private final boolean webSubmitEnabled;
+
+ @JsonCreator
+ public Features(@JsonProperty(FIELD_NAME_FEATURE_WEB_SUBMIT) boolean webSubmitEnabled) {
+ this.webSubmitEnabled = webSubmitEnabled;
+ }
+
+ @JsonIgnore
+ public boolean isWebSubmitEnabled() {
+ return webSubmitEnabled;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Features features = (Features) o;
+ return webSubmitEnabled == features.webSubmitEnabled;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(webSubmitEnabled);
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -104,15 +159,16 @@ public class DashboardConfiguration implements ResponseBody {
timeZoneOffset == that.timeZoneOffset &&
Objects.equals(timeZoneName, that.timeZoneName) &&
Objects.equals(flinkVersion, that.flinkVersion) &&
- Objects.equals(flinkRevision, that.flinkRevision);
+ Objects.equals(flinkRevision, that.flinkRevision) &&
+ Objects.equals(features, that.features);
}
@Override
public int hashCode() {
- return Objects.hash(refreshInterval, timeZoneName, timeZoneOffset, flinkVersion, flinkRevision);
+ return Objects.hash(refreshInterval, timeZoneName, timeZoneOffset, flinkVersion, flinkRevision, features);
}
- public static DashboardConfiguration from(long refreshInterval, ZonedDateTime zonedDateTime) {
+ public static DashboardConfiguration from(long refreshInterval, ZonedDateTime zonedDateTime, boolean webSubmitEnabled) {
final String flinkVersion = EnvironmentInformation.getVersion();
@@ -131,6 +187,7 @@ public class DashboardConfiguration implements ResponseBody {
// convert zone date time into offset in order to not do the day light saving adaptions wrt the offset
zonedDateTime.toOffsetDateTime().getOffset().getTotalSeconds() * 1000,
flinkVersion,
- flinkRevision);
+ flinkRevision,
+ new Features(webSubmitEnabled));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 68b9c75..1c31edb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -215,7 +215,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
timeout,
responseHeaders,
DashboardConfigurationHeaders.getInstance(),
- restConfiguration.getRefreshInterval());
+ restConfiguration.getRefreshInterval(),
+ restConfiguration.isWebSubmitEnabled());
JobIdsHandler jobIdsHandler = new JobIdsHandler(
leaderRetriever,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
new file mode 100644
index 0000000..4f389ab
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link RestHandlerConfiguration}.
+ */
+public class RestHandlerConfigurationTest extends TestLogger {
+
+ @Test
+ public void testWebSubmitFeatureFlagEnabled() {
+ testWebSubmitFeatureFlag(true);
+ }
+
+ @Test
+ public void testWebSubmitFeatureFlagDisabled() {
+ testWebSubmitFeatureFlag(false);
+ }
+
+ private static void testWebSubmitFeatureFlag(boolean webSubmitEnabled) {
+ final Configuration config = new Configuration();
+ config.setBoolean(WebOptions.SUBMIT_ENABLE, webSubmitEnabled);
+
+ RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(config);
+ assertEquals(webSubmitEnabled, restHandlerConfiguration.isWebSubmitEnabled());
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
index 789310e..5490ab0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
@@ -35,6 +35,7 @@ public class DashboardConfigurationTest extends RestResponseMarshallingTestBase<
"foobar",
42,
"version",
- "revision");
+ "revision",
+ new DashboardConfiguration.Features(true));
}
}