You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2022/07/06 09:10:05 UTC

[flink] branch master updated (85ba36fc012 -> adfe97deb5c)

This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 85ba36fc012 [FLINK-28404][tests] Invert isAssignableFrom order
     new ebdde651eda [hotfix][rest] Rename ClusterConfigurationInfo to ConfigurationInfo
     new 1449e8da48b [FLINK-28311][rest] Introduce JobManagerEnvironmentHandler
     new 4ab2536b968 [FLINK-28311][rest] Introduce JobManagerJobConfigurationHandler
     new adfe97deb5c [FLINK-28311][rest] Introduce JobManagerJobEnvironmentHandler

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../shortcodes/generated/rest_v1_dispatcher.html   | 280 ++++++++++++++++++++-
 docs/static/generated/rest_v1_dispatcher.yml       | 101 +++++++-
 .../src/test/resources/rest_api_v1.snapshot        | 153 ++++++++++-
 .../rest/handler/cluster/ClusterConfigHandler.java |  15 +-
 ...dler.java => JobManagerEnvironmentHandler.java} |  30 +--
 .../JobManagerJobConfigurationHandler.java}        |  50 ++--
 .../job/JobManagerJobEnvironmentHandler.java       |  82 ++++++
 .../messages/ClusterConfigurationInfoHeaders.java  |   7 +-
 ...nfigurationInfo.java => ConfigurationInfo.java} |  15 +-
 ...nInfoEntry.java => ConfigurationInfoEntry.java} |   8 +-
 .../runtime/rest/messages/EnvironmentInfo.java     | 184 ++++++++++++++
 ...ders.java => JobManagerEnvironmentHeaders.java} |  48 ++--
 .../JobManagerJobConfigurationHeaders.java}        |  43 ++--
 .../JobManagerJobEnvironmentHeaders.java}          |  43 ++--
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  40 +++
 .../job/JobManagerJobConfigurationHandlerTest.java |  68 +++++
 ...ionInfoTest.java => ConfigurationInfoTest.java} |  17 +-
 ...ationInfoTest.java => EnvironmentInfoTest.java} |  18 +-
 .../yarn/YARNSessionCapacitySchedulerITCase.java   |  11 +-
 19 files changed, 1044 insertions(+), 169 deletions(-)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/{ClusterConfigHandler.java => JobManagerEnvironmentHandler.java} (67%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/{cluster/ClusterConfigHandler.java => job/JobManagerJobConfigurationHandler.java} (53%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobEnvironmentHandler.java
 rename flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/{ClusterConfigurationInfo.java => ConfigurationInfo.java} (76%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/{ClusterConfigurationInfoEntry.java => ConfigurationInfoEntry.java} (89%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EnvironmentInfo.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/{ClusterConfigurationInfoHeaders.java => JobManagerEnvironmentHeaders.java} (61%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/{ClusterConfigurationInfoHeaders.java => job/JobManagerJobConfigurationHeaders.java} (54%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/{ClusterConfigurationInfoHeaders.java => job/JobManagerJobEnvironmentHeaders.java} (54%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandlerTest.java
 copy flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/{ClusterConfigurationInfoTest.java => ConfigurationInfoTest.java} (61%)
 rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/{ClusterConfigurationInfoTest.java => EnvironmentInfoTest.java} (59%)


[flink] 03/04: [FLINK-28311][rest] Introduce JobManagerJobConfigurationHandler

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ab2536b968ad26aa0b672217e68a0450a47f5fd
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Mon Jul 4 13:44:10 2022 +0800

    [FLINK-28311][rest] Introduce JobManagerJobConfigurationHandler
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 76 +++++++++++++++++++
 docs/static/generated/rest_v1_dispatcher.yml       | 23 ++++++
 .../src/test/resources/rest_api_v1.snapshot        | 31 ++++++++
 .../job/JobManagerJobConfigurationHandler.java     | 86 ++++++++++++++++++++++
 .../job/JobManagerJobConfigurationHeaders.java     | 80 ++++++++++++++++++++
 .../runtime/webmonitor/WebMonitorEndpoint.java     | 14 ++++
 .../job/JobManagerJobConfigurationHandlerTest.java | 68 +++++++++++++++++
 7 files changed, 378 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 0846f9d1c1d..64ebceeae89 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3021,6 +3021,82 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
     </tr>
   </tbody>
 </table>
+<table class="rest-api table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/jobmanager/config</strong></h5></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Returns the jobmanager's configuration of a specific job.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+      <div class="book-expand">
+        <label>
+          <div class="book-expand-head flex justify-between">
+            <span>Request</span>
+            &nbsp;            <span>▾</span>
+          </div>
+          <input type="checkbox" class="hidden">
+          <div class="book-expand-content markdown-inner">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+          </div>
+        </label>
+      </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+      <div class="book-expand">
+        <label>
+          <div class="book-expand-head flex justify-between">
+            <span>Response</span>
+            &nbsp;            <span>▾</span>
+          </div>
+          <input type="checkbox" class="hidden">
+          <div class="book-expand-content markdown-inner">
+          <pre>
+            <code>
+{
+  "type" : "array",
+  "items" : {
+    "type" : "object",
+    "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
+    "properties" : {
+      "key" : {
+        "type" : "string"
+      },
+      "value" : {
+        "type" : "string"
+      }
+    }
+  }
+}            </code>
+          </pre>
+          </div>
+        </label>
+      </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
 <table class="rest-api table table-bordered">
   <tbody>
     <tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml
index 3f9d37dace0..ecb6b964f33 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -636,6 +636,29 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/JobExecutionResultResponseBody'
+  /jobs/{jobid}/jobmanager/config:
+    get:
+      description: Returns the jobmanager's configuration of a specific job.
+      operationId: getJobManagerJobConfiguration
+      parameters:
+      - name: jobid
+        in: path
+        description: 32-character hexadecimal string value that identifies a job.
+        required: true
+        schema:
+          $ref: '#/components/schemas/JobID'
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                type: array
+                properties:
+                  empty:
+                    type: boolean
+                items:
+                  $ref: '#/components/schemas/ConfigurationInfoEntry'
   /jobs/{jobid}/metrics:
     get:
       description: Provides access to job metrics.
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 37a2482703c..87aedec6d18 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1887,6 +1887,37 @@
         }
       }
     }
+  }, {
+    "url" : "/jobs/:jobid/jobmanager/config",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "any"
+    },
+    "response" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
+        "properties" : {
+          "key" : {
+            "type" : "string"
+          },
+          "value" : {
+            "type" : "string"
+          }
+        }
+      }
+    }
   }, {
     "url" : "/jobs/:jobid/metrics",
     "method" : "GET",
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandler.java
new file mode 100644
index 00000000000..2e0c2825054
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Handler which serves the jobmanager's configuration of a specific job. */
+public class JobManagerJobConfigurationHandler
+        extends AbstractRestHandler<
+                RestfulGateway, EmptyRequestBody, ConfigurationInfo, JobMessageParameters>
+        implements JsonArchivist {
+    private final ConfigurationInfo jobConfig;
+
+    public JobManagerJobConfigurationHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, ConfigurationInfo, JobMessageParameters>
+                    messageHeaders,
+            Configuration configuration) {
+        super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+        Preconditions.checkNotNull(configuration);
+        this.jobConfig = ConfigurationInfo.from(configuration);
+    }
+
+    @Override
+    protected CompletableFuture<ConfigurationInfo> handleRequest(
+            @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway)
+            throws RestHandlerException {
+        return CompletableFuture.completedFuture(jobConfig);
+    }
+
+    @Override
+    public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo)
+            throws IOException {
+        return Collections.singletonList(
+                new ArchivedJson(
+                        JobManagerJobConfigurationHeaders.getInstance()
+                                .getTargetRestEndpointURL()
+                                .replace(
+                                        ':' + JobIDPathParameter.KEY,
+                                        executionGraphInfo.getJobId().toString()),
+                        jobConfig));
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobConfigurationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobConfigurationHeaders.java
new file mode 100644
index 00000000000..ec8297e2b8b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobConfigurationHeaders.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link
+ * org.apache.flink.runtime.rest.handler.job.JobManagerJobConfigurationHandler}.
+ */
+public class JobManagerJobConfigurationHeaders
+        implements MessageHeaders<EmptyRequestBody, ConfigurationInfo, JobMessageParameters> {
+    private static final JobManagerJobConfigurationHeaders INSTANCE =
+            new JobManagerJobConfigurationHeaders();
+
+    public static final String JOBMANAGER_JOB_CONFIG_REST_PATH = "/jobs/:jobid/jobmanager/config";
+
+    private JobManagerJobConfigurationHeaders() {}
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return JOBMANAGER_JOB_CONFIG_REST_PATH;
+    }
+
+    @Override
+    public Class<ConfigurationInfo> getResponseClass() {
+        return ConfigurationInfo.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public JobMessageParameters getUnresolvedMessageParameters() {
+        return new JobMessageParameters();
+    }
+
+    public static JobManagerJobConfigurationHeaders getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Returns the jobmanager's configuration of a specific job.";
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index a77c3d5217a..b3fead6af48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler;
 import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobManagerJobConfigurationHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobStatusHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
@@ -123,6 +124,7 @@ import org.apache.flink.runtime.rest.messages.cluster.JobManagerStdoutFileHeader
 import org.apache.flink.runtime.rest.messages.cluster.JobManagerThreadDumpHeaders;
 import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
@@ -335,6 +337,14 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
                         executionGraphCache,
                         executor);
 
+        JobManagerJobConfigurationHandler jobManagerJobConfigurationHandler =
+                new JobManagerJobConfigurationHandler(
+                        leaderRetriever,
+                        timeout,
+                        responseHeaders,
+                        JobManagerJobConfigurationHeaders.getInstance(),
+                        clusterConfiguration);
+
         CheckpointConfigHandler checkpointConfigHandler =
                 new CheckpointConfigHandler(
                         leaderRetriever,
@@ -762,6 +772,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
                 Tuple2.of(
                         jobVertexBackPressureHandler.getMessageHeaders(),
                         jobVertexBackPressureHandler));
+        handlers.add(
+                Tuple2.of(
+                        jobManagerJobConfigurationHandler.getMessageHeaders(),
+                        jobManagerJobConfigurationHandler));
 
         final AbstractRestHandler<?, ?, ?, ?> jobVertexFlameGraphHandler;
         if (clusterConfiguration.get(RestOptions.ENABLE_FLAMEGRAPH)) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandlerTest.java
new file mode 100644
index 00000000000..855bf4724a6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandlerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for the {@link JobManagerJobConfigurationHandler}. */
+public class JobManagerJobConfigurationHandlerTest extends TestLogger {
+
+    @Test
+    public void testRequestConfiguration() throws Exception {
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.ADDRESS, "address");
+
+        final JobManagerJobConfigurationHandler handler =
+                new JobManagerJobConfigurationHandler(
+                        () -> null,
+                        TestingUtils.TIMEOUT,
+                        Collections.emptyMap(),
+                        JobManagerJobConfigurationHeaders.getInstance(),
+                        configuration);
+
+        final ConfigurationInfo configurationInfo =
+                handler.handleRequest(
+                                HandlerRequest.resolveParametersAndCreate(
+                                        EmptyRequestBody.getInstance(),
+                                        new JobMessageParameters(),
+                                        Collections.emptyMap(),
+                                        Collections.emptyMap(),
+                                        Collections.emptyList()),
+                                new TestingRestfulGateway.Builder().build())
+                        .get();
+
+        assertEquals(JobManagerOptions.ADDRESS.key(), configurationInfo.get(0).getKey());
+        assertEquals("address", configurationInfo.get(0).getValue());
+    }
+}


[flink] 01/04: [hotfix][rest] Rename ClusterConfigurationInfo to ConfigurationInfo

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ebdde651edae8db6b2ac740f07d97124dc01fea4
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Mon Jul 4 13:20:05 2022 +0800

    [hotfix][rest] Rename ClusterConfigurationInfo to ConfigurationInfo
---
 .../shortcodes/generated/rest_v1_dispatcher.html       |  2 +-
 docs/static/generated/rest_v1_dispatcher.yml           | 18 +++++++++---------
 .../src/test/resources/rest_api_v1.snapshot            |  2 +-
 .../rest/handler/cluster/ClusterConfigHandler.java     | 15 ++++++---------
 .../rest/messages/ClusterConfigurationInfoHeaders.java |  7 +++----
 ...erConfigurationInfo.java => ConfigurationInfo.java} | 15 ++++++---------
 ...ationInfoEntry.java => ConfigurationInfoEntry.java} |  8 ++++----
 ...urationInfoTest.java => ConfigurationInfoTest.java} | 17 ++++++++---------
 .../flink/yarn/YARNSessionCapacitySchedulerITCase.java | 11 +++++------
 9 files changed, 43 insertions(+), 52 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index eb862ac83bf..3683cbca3d6 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -857,7 +857,7 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
   "type" : "array",
   "items" : {
     "type" : "object",
-    "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ClusterConfigurationInfoEntry",
+    "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
     "properties" : {
       "key" : {
         "type" : "string"
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml
index 2836526421e..d8c0cb3b8e8 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -272,7 +272,7 @@ paths:
                   empty:
                     type: boolean
                 items:
-                  $ref: '#/components/schemas/ClusterConfigurationInfoEntry'
+                  $ref: '#/components/schemas/ConfigurationInfoEntry'
   /jobmanager/logs:
     get:
       description: Returns the list of log files on the JobManager.
@@ -2137,19 +2137,19 @@ components:
           type: array
           items:
             $ref: '#/components/schemas/UserAccumulator'
+    ConfigurationInfo:
+      type: array
+      properties:
+        empty:
+          type: boolean
+      items:
+        $ref: '#/components/schemas/ConfigurationInfoEntry'
     Type:
       type: string
       enum:
       - FULL
       - ON_CPU
       - OFF_CPU
-    ClusterConfigurationInfo:
-      type: array
-      properties:
-        empty:
-          type: boolean
-      items:
-        $ref: '#/components/schemas/ClusterConfigurationInfoEntry'
     UserAccumulator:
       type: object
       properties:
@@ -2372,7 +2372,7 @@ components:
           $ref: '#/components/schemas/JobID'
         status:
           $ref: '#/components/schemas/JobStatus'
-    ClusterConfigurationInfoEntry:
+    ConfigurationInfoEntry:
       type: object
       properties:
         key:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 144f4e774eb..c390c7f600b 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -472,7 +472,7 @@
       "type" : "array",
       "items" : {
         "type" : "object",
-        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ClusterConfigurationInfoEntry",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
         "properties" : {
           "key" : {
             "type" : "string"
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterConfigHandler.java
index 26d0aa67f1a..f5776588733 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterConfigHandler.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -39,28 +39,25 @@ import java.util.concurrent.CompletableFuture;
 /** Handler which serves the cluster's configuration. */
 public class ClusterConfigHandler
         extends AbstractRestHandler<
-                RestfulGateway,
-                EmptyRequestBody,
-                ClusterConfigurationInfo,
-                EmptyMessageParameters> {
+                RestfulGateway, EmptyRequestBody, ConfigurationInfo, EmptyMessageParameters> {
 
-    private final ClusterConfigurationInfo clusterConfig;
+    private final ConfigurationInfo clusterConfig;
 
     public ClusterConfigHandler(
             GatewayRetriever<? extends RestfulGateway> leaderRetriever,
             Time timeout,
             Map<String, String> responseHeaders,
-            MessageHeaders<EmptyRequestBody, ClusterConfigurationInfo, EmptyMessageParameters>
+            MessageHeaders<EmptyRequestBody, ConfigurationInfo, EmptyMessageParameters>
                     messageHeaders,
             Configuration configuration) {
         super(leaderRetriever, timeout, responseHeaders, messageHeaders);
 
         Preconditions.checkNotNull(configuration);
-        this.clusterConfig = ClusterConfigurationInfo.from(configuration);
+        this.clusterConfig = ConfigurationInfo.from(configuration);
     }
 
     @Override
-    protected CompletableFuture<ClusterConfigurationInfo> handleRequest(
+    protected CompletableFuture<ConfigurationInfo> handleRequest(
             @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway)
             throws RestHandlerException {
         return CompletableFuture.completedFuture(clusterConfig);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java
index 328376d8c08..1d9e80a5de6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java
@@ -25,8 +25,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 /** Message headers for the {@link ClusterConfigHandler}. */
 public final class ClusterConfigurationInfoHeaders
-        implements MessageHeaders<
-                EmptyRequestBody, ClusterConfigurationInfo, EmptyMessageParameters> {
+        implements MessageHeaders<EmptyRequestBody, ConfigurationInfo, EmptyMessageParameters> {
 
     private static final ClusterConfigurationInfoHeaders INSTANCE =
             new ClusterConfigurationInfoHeaders();
@@ -54,8 +53,8 @@ public final class ClusterConfigurationInfoHeaders
     }
 
     @Override
-    public Class<ClusterConfigurationInfo> getResponseClass() {
-        return ClusterConfigurationInfo.class;
+    public Class<ConfigurationInfo> getResponseClass() {
+        return ConfigurationInfo.class;
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfo.java
similarity index 76%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfo.java
index 5ca1a7c9891..bcb90b95abe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfo.java
@@ -29,29 +29,26 @@ import java.util.Map;
  * Response of the {@link ClusterConfigHandler}, represented as a list of key-value pairs of the
  * cluster {@link Configuration}.
  */
-public class ClusterConfigurationInfo extends ArrayList<ClusterConfigurationInfoEntry>
-        implements ResponseBody {
+public class ConfigurationInfo extends ArrayList<ConfigurationInfoEntry> implements ResponseBody {
 
     private static final long serialVersionUID = -1170348873871206964L;
 
     // a default constructor is required for collection type marshalling
-    public ClusterConfigurationInfo() {}
+    public ConfigurationInfo() {}
 
-    public ClusterConfigurationInfo(int initialEntries) {
+    public ConfigurationInfo(int initialEntries) {
         super(initialEntries);
     }
 
-    public static ClusterConfigurationInfo from(Configuration config) {
-        final ClusterConfigurationInfo clusterConfig =
-                new ClusterConfigurationInfo(config.keySet().size());
+    public static ConfigurationInfo from(Configuration config) {
+        final ConfigurationInfo clusterConfig = new ConfigurationInfo(config.keySet().size());
         final Map<String, String> configurationWithHiddenSensitiveValues =
                 ConfigurationUtils.hideSensitiveValues(config.toMap());
 
         for (Map.Entry<String, String> keyValuePair :
                 configurationWithHiddenSensitiveValues.entrySet()) {
             clusterConfig.add(
-                    new ClusterConfigurationInfoEntry(
-                            keyValuePair.getKey(), keyValuePair.getValue()));
+                    new ConfigurationInfoEntry(keyValuePair.getKey(), keyValuePair.getValue()));
         }
 
         return clusterConfig;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfoEntry.java
similarity index 89%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoEntry.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfoEntry.java
index 85d2f7efa37..23791fba80b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoEntry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfoEntry.java
@@ -25,8 +25,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 
 import java.util.Objects;
 
-/** A single key-value pair entry in the {@link ClusterConfigurationInfo} response. */
-public class ClusterConfigurationInfoEntry {
+/** A single key-value pair entry in the {@link ConfigurationInfo} response. */
+public class ConfigurationInfoEntry {
 
     public static final String FIELD_NAME_CONFIG_KEY = "key";
     public static final String FIELD_NAME_CONFIG_VALUE = "value";
@@ -38,7 +38,7 @@ public class ClusterConfigurationInfoEntry {
     private final String value;
 
     @JsonCreator
-    public ClusterConfigurationInfoEntry(
+    public ConfigurationInfoEntry(
             @JsonProperty(FIELD_NAME_CONFIG_KEY) String key,
             @JsonProperty(FIELD_NAME_CONFIG_VALUE) String value) {
         this.key = Preconditions.checkNotNull(key);
@@ -63,7 +63,7 @@ public class ClusterConfigurationInfoEntry {
             return false;
         }
 
-        ClusterConfigurationInfoEntry that = (ClusterConfigurationInfoEntry) o;
+        ConfigurationInfoEntry that = (ConfigurationInfoEntry) o;
         return key.equals(that.key) && value.equals(that.value);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ConfigurationInfoTest.java
similarity index 61%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ConfigurationInfoTest.java
index 1b07ae51520..f10de95a61b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ConfigurationInfoTest.java
@@ -18,20 +18,19 @@
 
 package org.apache.flink.runtime.rest.messages;
 
-/** Tests for the {@link ClusterConfigurationInfo}. */
-public class ClusterConfigurationInfoTest
-        extends RestResponseMarshallingTestBase<ClusterConfigurationInfo> {
+/** Tests for the {@link ConfigurationInfo}. */
+public class ConfigurationInfoTest extends RestResponseMarshallingTestBase<ConfigurationInfo> {
 
     @Override
-    protected Class<ClusterConfigurationInfo> getTestResponseClass() {
-        return ClusterConfigurationInfo.class;
+    protected Class<ConfigurationInfo> getTestResponseClass() {
+        return ConfigurationInfo.class;
     }
 
     @Override
-    protected ClusterConfigurationInfo getTestResponseInstance() {
-        final ClusterConfigurationInfo expected = new ClusterConfigurationInfo(2);
-        expected.add(new ClusterConfigurationInfoEntry("key1", "value1"));
-        expected.add(new ClusterConfigurationInfoEntry("key2", "value2"));
+    protected ConfigurationInfo getTestResponseInstance() {
+        final ConfigurationInfo expected = new ConfigurationInfo(2);
+        expected.add(new ConfigurationInfoEntry("key1", "value1"));
+        expected.add(new ConfigurationInfoEntry("key2", "value2"));
 
         return expected;
     }
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 125bb3b0d91..6a0409ad0d4 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -25,10 +25,10 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
-import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
-import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoEntry;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfoEntry;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
@@ -425,16 +425,15 @@ class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 
     private static Map<String, String> getFlinkConfig(final String host, final int port)
             throws Exception {
-        final ClusterConfigurationInfo clusterConfigurationInfoEntries =
+        final ConfigurationInfo configurationInfoEntries =
                 restClient
                         .sendRequest(host, port, ClusterConfigurationInfoHeaders.getInstance())
                         .get();
 
-        return clusterConfigurationInfoEntries.stream()
+        return configurationInfoEntries.stream()
                 .collect(
                         Collectors.toMap(
-                                ClusterConfigurationInfoEntry::getKey,
-                                ClusterConfigurationInfoEntry::getValue));
+                                ConfigurationInfoEntry::getKey, ConfigurationInfoEntry::getValue));
     }
 
     /**


[flink] 02/04: [FLINK-28311][rest] Introduce JobManagerEnvironmentHandler

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1449e8da48b5cf798fad32a71d9bbf7c927c5acf
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Wed Jun 1 16:23:29 2022 +0800

    [FLINK-28311][rest] Introduce JobManagerEnvironmentHandler
---
 .../shortcodes/generated/rest_v1_dispatcher.html   |  96 +++++++++++
 docs/static/generated/rest_v1_dispatcher.yml       |  42 +++++
 .../src/test/resources/rest_api_v1.snapshot        |  59 +++++++
 .../cluster/JobManagerEnvironmentHandler.java      |  60 +++++++
 .../runtime/rest/messages/EnvironmentInfo.java     | 184 +++++++++++++++++++++
 .../messages/JobManagerEnvironmentHeaders.java     |  71 ++++++++
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  13 ++
 .../runtime/rest/messages/EnvironmentInfoTest.java |  32 ++++
 8 files changed, 557 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 3683cbca3d6..0846f9d1c1d 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -876,6 +876,102 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
     </tr>
   </tbody>
 </table>
+<table class="rest-api table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" colspan="2"><h5><strong>/jobmanager/environment</strong></h5></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Returns the jobmanager environment.</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+      <div class="book-expand">
+        <label>
+          <div class="book-expand-head flex justify-between">
+            <span>Request</span>
+            &nbsp;            <span>▾</span>
+          </div>
+          <input type="checkbox" class="hidden">
+          <div class="book-expand-content markdown-inner">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+          </div>
+        </label>
+      </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+      <div class="book-expand">
+        <label>
+          <div class="book-expand-head flex justify-between">
+            <span>Response</span>
+            &nbsp;            <span>▾</span>
+          </div>
+          <input type="checkbox" class="hidden">
+          <div class="book-expand-content markdown-inner">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo",
+  "properties" : {
+    "classpath" : {
+      "type" : "array",
+      "items" : {
+        "type" : "string"
+      }
+    },
+    "environment" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo:EnvironmentVariableItem",
+        "properties" : {
+          "key" : {
+            "type" : "string"
+          },
+          "value" : {
+            "type" : "string"
+          }
+        }
+      }
+    },
+    "jvm" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo:JVMInfo",
+      "properties" : {
+        "arch" : {
+          "type" : "string"
+        },
+        "options" : {
+          "type" : "array",
+          "items" : {
+            "type" : "string"
+          }
+        },
+        "version" : {
+          "type" : "string"
+        }
+      }
+    }
+  }
+}            </code>
+          </pre>
+          </div>
+        </label>
+      </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
 <table class="rest-api table table-bordered">
   <tbody>
     <tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml
index d8c0cb3b8e8..3f9d37dace0 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -273,6 +273,17 @@ paths:
                     type: boolean
                 items:
                   $ref: '#/components/schemas/ConfigurationInfoEntry'
+  /jobmanager/environment:
+    get:
+      description: Returns the jobmanager environment.
+      operationId: getJobManagerEnvironment
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/EnvironmentInfo'
   /jobmanager/logs:
     get:
       description: Returns the list of log files on the JobManager.
@@ -2021,6 +2032,17 @@ components:
       type: object
       allOf:
       - $ref: '#/components/schemas/SubtaskCheckpointStatistics'
+    JVMInfo:
+      type: object
+      properties:
+        version:
+          type: string
+        arch:
+          type: string
+        options:
+          type: array
+          items:
+            type: string
     LogInfo:
       type: object
       properties:
@@ -2891,6 +2913,19 @@ components:
       - FAILED
       - CANCELED
       - UNKNOWN
+    EnvironmentInfo:
+      type: object
+      properties:
+        environment:
+          type: array
+          items:
+            $ref: '#/components/schemas/EnvironmentVariableItem'
+        jvm:
+          $ref: '#/components/schemas/JVMInfo'
+        classpath:
+          type: array
+          items:
+            type: string
     Summary:
       type: object
       properties:
@@ -2918,6 +2953,13 @@ components:
           type: string
         value:
           type: string
+    EnvironmentVariableItem:
+      type: object
+      properties:
+        key:
+          type: string
+        value:
+          type: string
     TriggerResponse:
       type: object
       properties:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index c390c7f600b..37a2482703c 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -483,6 +483,65 @@
         }
       }
     }
+  }, {
+    "url" : "/jobmanager/environment",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "any"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo",
+      "properties" : {
+        "environment" : {
+          "type" : "array",
+          "items" : {
+            "type" : "object",
+            "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo:EnvironmentVariableItem",
+            "properties" : {
+              "key" : {
+                "type" : "string"
+              },
+              "value" : {
+                "type" : "string"
+              }
+            }
+          }
+        },
+        "jvm" : {
+          "type" : "object",
+          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo:JVMInfo",
+          "properties" : {
+            "version" : {
+              "type" : "string"
+            },
+            "arch" : {
+              "type" : "string"
+            },
+            "options" : {
+              "type" : "array",
+              "items" : {
+                "type" : "string"
+              }
+            }
+          }
+        },
+        "classpath" : {
+          "type" : "array",
+          "items" : {
+            "type" : "string"
+          }
+        }
+      }
+    }
   }, {
     "url" : "/jobmanager/logs",
     "method" : "GET",
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerEnvironmentHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerEnvironmentHandler.java
new file mode 100644
index 00000000000..362b89fc14f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerEnvironmentHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.cluster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EnvironmentInfo;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Handler which serves the jobmanager's environment variables. */
+public class JobManagerEnvironmentHandler
+        extends AbstractRestHandler<
+                RestfulGateway, EmptyRequestBody, EnvironmentInfo, EmptyMessageParameters> {
+    private final EnvironmentInfo environmentInfo;
+
+    public JobManagerEnvironmentHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, EnvironmentInfo, EmptyMessageParameters>
+                    messageHeaders) {
+        super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+        this.environmentInfo = EnvironmentInfo.create();
+    }
+
+    @Override
+    protected CompletableFuture<EnvironmentInfo> handleRequest(
+            @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway)
+            throws RestHandlerException {
+        return CompletableFuture.completedFuture(environmentInfo);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EnvironmentInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EnvironmentInfo.java
new file mode 100644
index 00000000000..44c80388ebe
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EnvironmentInfo.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/** The response of environment info. */
+public class EnvironmentInfo implements ResponseBody {
+
+    private static final String FIELD_NAME_ENVIRONMENT_INFO = "environment";
+
+    private static final String FIELD_NAME_JVM_INFO = "jvm";
+
+    private static final String FIELD_NAME_CLASSPATH = "classpath";
+
+    @JsonProperty(FIELD_NAME_ENVIRONMENT_INFO)
+    private final List<EnvironmentVariableItem> environmentVariables;
+
+    @JsonProperty(FIELD_NAME_JVM_INFO)
+    private final JVMInfo jvmInfo;
+
+    @JsonProperty(FIELD_NAME_CLASSPATH)
+    private final List<String> classpath;
+
+    @JsonCreator
+    public EnvironmentInfo(
+            @JsonProperty(FIELD_NAME_ENVIRONMENT_INFO)
+                    List<EnvironmentVariableItem> environmentVariables,
+            @JsonProperty(FIELD_NAME_JVM_INFO) JVMInfo jvmInfo,
+            @JsonProperty(FIELD_NAME_CLASSPATH) List<String> classpath) {
+        this.environmentVariables = environmentVariables;
+        this.jvmInfo = jvmInfo;
+        this.classpath = classpath;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        EnvironmentInfo that = (EnvironmentInfo) o;
+        return environmentVariables.equals(that.environmentVariables)
+                && jvmInfo.equals(that.jvmInfo)
+                && classpath.equals(that.classpath);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(environmentVariables, jvmInfo, classpath);
+    }
+
+    public static EnvironmentInfo create() {
+        List<EnvironmentVariableItem> environmentVariableItems = new ArrayList<>();
+        System.getenv()
+                .forEach(
+                        (key, value) ->
+                                environmentVariableItems.add(
+                                        new EnvironmentVariableItem(key, value)));
+
+        return new EnvironmentInfo(
+                environmentVariableItems,
+                JVMInfo.create(),
+                Arrays.asList(System.getProperty("java.class.path").split(":")));
+    }
+
+    /** A single key-value pair entry in the {@link EnvironmentInfo} response. */
+    private static class EnvironmentVariableItem {
+        private static final String FIELD_NAME_KEY = "key";
+
+        private static final String FIELD_NAME_VALUE = "value";
+
+        @JsonProperty(FIELD_NAME_KEY)
+        private final String key;
+
+        @JsonProperty(FIELD_NAME_VALUE)
+        private final String value;
+
+        @JsonCreator
+        public EnvironmentVariableItem(
+                @JsonProperty(FIELD_NAME_KEY) String key,
+                @JsonProperty(FIELD_NAME_VALUE) String value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            EnvironmentVariableItem that = (EnvironmentVariableItem) o;
+            return key.equals(that.key) && value.equals(that.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key, value);
+        }
+    }
+
+    /** JVM information. */
+    private static class JVMInfo {
+        private static final String FIELD_NAME_VERSION = "version";
+
+        private static final String FIELD_NAME_ARCH = "arch";
+
+        private static final String FIELD_NAME_OPTIONS = "options";
+
+        @JsonProperty(FIELD_NAME_VERSION)
+        private final String version;
+
+        @JsonProperty(FIELD_NAME_ARCH)
+        private final String arch;
+
+        @JsonProperty(FIELD_NAME_OPTIONS)
+        private final List<String> options;
+
+        @JsonCreator
+        public JVMInfo(
+                @JsonProperty(FIELD_NAME_VERSION) String version,
+                @JsonProperty(FIELD_NAME_ARCH) String arch,
+                @JsonProperty(FIELD_NAME_OPTIONS) List<String> options) {
+            this.version = version;
+            this.arch = arch;
+            this.options = options;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            JVMInfo that = (JVMInfo) o;
+            return version.equals(that.version)
+                    && arch.equals(that.arch)
+                    && options.equals(that.options);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(version, arch, options);
+        }
+
+        private static JVMInfo create() {
+            return new JVMInfo(
+                    EnvironmentInformation.getJvmVersion(),
+                    System.getProperty("os.arch"),
+                    Arrays.asList(EnvironmentInformation.getJvmStartupOptionsArray()));
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerEnvironmentHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerEnvironmentHeaders.java
new file mode 100644
index 00000000000..f6b0aca14d0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerEnvironmentHeaders.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.cluster.JobManagerEnvironmentHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Message headers for the {@link JobManagerEnvironmentHandler}. */
+public class JobManagerEnvironmentHeaders
+        implements MessageHeaders<EmptyRequestBody, EnvironmentInfo, EmptyMessageParameters> {
+    private static final JobManagerEnvironmentHeaders INSTANCE = new JobManagerEnvironmentHeaders();
+
+    public static final String JOB_MANAGER_ENV_REST_PATH = "/jobmanager/environment";
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return JOB_MANAGER_ENV_REST_PATH;
+    }
+
+    @Override
+    public Class<EnvironmentInfo> getResponseClass() {
+        return EnvironmentInfo.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Returns the jobmanager environment.";
+    }
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public EmptyMessageParameters getUnresolvedMessageParameters() {
+        return EmptyMessageParameters.getInstance();
+    }
+
+    public static JobManagerEnvironmentHeaders getInstance() {
+        return INSTANCE;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 3b21be96973..a77c3d5217a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler;
 import org.apache.flink.runtime.rest.handler.cluster.JobManagerCustomLogHandler;
+import org.apache.flink.runtime.rest.handler.cluster.JobManagerEnvironmentHandler;
 import org.apache.flink.runtime.rest.handler.cluster.JobManagerLogFileHandler;
 import org.apache.flink.runtime.rest.handler.cluster.JobManagerLogListHandler;
 import org.apache.flink.runtime.rest.handler.cluster.JobManagerThreadDumpHandler;
@@ -99,6 +100,7 @@ import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
 import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.JobManagerEnvironmentHeaders;
 import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
@@ -317,6 +319,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
                         ClusterConfigurationInfoHeaders.getInstance(),
                         clusterConfiguration);
 
+        JobManagerEnvironmentHandler jobManagerEnvironmentHandler =
+                new JobManagerEnvironmentHandler(
+                        leaderRetriever,
+                        timeout,
+                        responseHeaders,
+                        JobManagerEnvironmentHeaders.getInstance());
+
         JobConfigHandler jobConfigHandler =
                 new JobConfigHandler(
                         leaderRetriever,
@@ -660,6 +669,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
                 Tuple2.of(
                         clusterConfigurationHandler.getMessageHeaders(),
                         clusterConfigurationHandler));
+        handlers.add(
+                Tuple2.of(
+                        jobManagerEnvironmentHandler.getMessageHeaders(),
+                        jobManagerEnvironmentHandler));
         handlers.add(Tuple2.of(dashboardConfigHandler.getMessageHeaders(), dashboardConfigHandler));
         handlers.add(Tuple2.of(jobIdsHandler.getMessageHeaders(), jobIdsHandler));
         handlers.add(Tuple2.of(jobStatusHandler.getMessageHeaders(), jobStatusHandler));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/EnvironmentInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/EnvironmentInfoTest.java
new file mode 100644
index 00000000000..6b8696c06ac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/EnvironmentInfoTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/** Tests for the {@link EnvironmentInfo}. */
+public class EnvironmentInfoTest extends RestResponseMarshallingTestBase<EnvironmentInfo> {
+    @Override
+    protected Class<EnvironmentInfo> getTestResponseClass() {
+        return EnvironmentInfo.class;
+    }
+
+    @Override
+    protected EnvironmentInfo getTestResponseInstance() {
+        return EnvironmentInfo.create();
+    }
+}


[flink] 04/04: [FLINK-28311][rest] Introduce JobManagerJobEnvironmentHandler

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit adfe97deb5cea83c9c0b2af6dc295c6aa33ab96c
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Mon Jul 4 15:10:31 2022 +0800

    [FLINK-28311][rest] Introduce JobManagerJobEnvironmentHandler
    
    This closes #20150.
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 106 +++++++++++++++++++++
 docs/static/generated/rest_v1_dispatcher.yml       |  18 ++++
 .../src/test/resources/rest_api_v1.snapshot        |  61 ++++++++++++
 .../job/JobManagerJobEnvironmentHandler.java       |  82 ++++++++++++++++
 .../job/JobManagerJobEnvironmentHeaders.java       |  80 ++++++++++++++++
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  13 +++
 6 files changed, 360 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 64ebceeae89..3fb81e43024 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3097,6 +3097,112 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
     </tr>
   </tbody>
 </table>
+<table class="rest-api table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/jobmanager/environment</strong></h5></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Returns the jobmanager's environment variables of a specific job.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+      <div class="book-expand">
+        <label>
+          <div class="book-expand-head flex justify-between">
+            <span>Request</span>
+            &nbsp;            <span>▾</span>
+          </div>
+          <input type="checkbox" class="hidden">
+          <div class="book-expand-content markdown-inner">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+          </div>
+        </label>
+      </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+      <div class="book-expand">
+        <label>
+          <div class="book-expand-head flex justify-between">
+            <span>Response</span>
+            &nbsp;            <span>▾</span>
+          </div>
+          <input type="checkbox" class="hidden">
+          <div class="book-expand-content markdown-inner">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo",
+  "properties" : {
+    "classpath" : {
+      "type" : "array",
+      "items" : {
+        "type" : "string"
+      }
+    },
+    "environment" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo:EnvironmentVariableItem",
+        "properties" : {
+          "key" : {
+            "type" : "string"
+          },
+          "value" : {
+            "type" : "string"
+          }
+        }
+      }
+    },
+    "jvm" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo:JVMInfo",
+      "properties" : {
+        "arch" : {
+          "type" : "string"
+        },
+        "options" : {
+          "type" : "array",
+          "items" : {
+            "type" : "string"
+          }
+        },
+        "version" : {
+          "type" : "string"
+        }
+      }
+    }
+  }
+}            </code>
+          </pre>
+          </div>
+        </label>
+      </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
 <table class="rest-api table table-bordered">
   <tbody>
     <tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml
index ecb6b964f33..b495a75290b 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -659,6 +659,24 @@ paths:
                     type: boolean
                 items:
                   $ref: '#/components/schemas/ConfigurationInfoEntry'
+  /jobs/{jobid}/jobmanager/environment:
+    get:
+      description: Returns the jobmanager's environment variables of a specific job.
+      operationId: getJobManagerJobEnvironment
+      parameters:
+      - name: jobid
+        in: path
+        description: 32-character hexadecimal string value that identifies a job.
+        required: true
+        schema:
+          $ref: '#/components/schemas/JobID'
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/EnvironmentInfo'
   /jobs/{jobid}/metrics:
     get:
       description: Provides access to job metrics.
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 87aedec6d18..2de7c030c29 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1918,6 +1918,67 @@
         }
       }
     }
+  }, {
+    "url" : "/jobs/:jobid/jobmanager/environment",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "any"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo",
+      "properties" : {
+        "environment" : {
+          "type" : "array",
+          "items" : {
+            "type" : "object",
+            "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo:EnvironmentVariableItem",
+            "properties" : {
+              "key" : {
+                "type" : "string"
+              },
+              "value" : {
+                "type" : "string"
+              }
+            }
+          }
+        },
+        "jvm" : {
+          "type" : "object",
+          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo:JVMInfo",
+          "properties" : {
+            "version" : {
+              "type" : "string"
+            },
+            "arch" : {
+              "type" : "string"
+            },
+            "options" : {
+              "type" : "array",
+              "items" : {
+                "type" : "string"
+              }
+            }
+          }
+        },
+        "classpath" : {
+          "type" : "array",
+          "items" : {
+            "type" : "string"
+          }
+        }
+      }
+    }
   }, {
     "url" : "/jobs/:jobid/metrics",
     "method" : "GET",
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobEnvironmentHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobEnvironmentHandler.java
new file mode 100644
index 00000000000..ba2ff0ba494
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobEnvironmentHandler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EnvironmentInfo;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobManagerJobEnvironmentHeaders;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Handler which serves the jobmanager's environment variables of a specific job. */
+public class JobManagerJobEnvironmentHandler
+        extends AbstractRestHandler<
+                RestfulGateway, EmptyRequestBody, EnvironmentInfo, JobMessageParameters>
+        implements JsonArchivist {
+    private final EnvironmentInfo environmentInfo;
+
+    public JobManagerJobEnvironmentHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, EnvironmentInfo, JobMessageParameters>
+                    messageHeaders) {
+        super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+        this.environmentInfo = EnvironmentInfo.create();
+    }
+
+    @Override
+    protected CompletableFuture<EnvironmentInfo> handleRequest(
+            @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway)
+            throws RestHandlerException {
+        return CompletableFuture.completedFuture(environmentInfo);
+    }
+
+    @Override
+    public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo)
+            throws IOException {
+        return Collections.singletonList(
+                new ArchivedJson(
+                        JobManagerJobEnvironmentHeaders.getInstance()
+                                .getTargetRestEndpointURL()
+                                .replace(
+                                        ':' + JobIDPathParameter.KEY,
+                                        executionGraphInfo.getJobId().toString()),
+                        environmentInfo));
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobEnvironmentHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobEnvironmentHeaders.java
new file mode 100644
index 00000000000..628b3ceb818
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobEnvironmentHeaders.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EnvironmentInfo;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link
+ * org.apache.flink.runtime.rest.handler.job.JobManagerJobEnvironmentHandler}.
+ */
+public class JobManagerJobEnvironmentHeaders
+        implements MessageHeaders<EmptyRequestBody, EnvironmentInfo, JobMessageParameters> {
+    private static final JobManagerJobEnvironmentHeaders INSTANCE =
+            new JobManagerJobEnvironmentHeaders();
+
+    public static final String JOBMANAGER_JOB_ENV_REST_PATH = "/jobs/:jobid/jobmanager/environment";
+
+    private JobManagerJobEnvironmentHeaders() {}
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return JOBMANAGER_JOB_ENV_REST_PATH;
+    }
+
+    @Override
+    public Class<EnvironmentInfo> getResponseClass() {
+        return EnvironmentInfo.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public JobMessageParameters getUnresolvedMessageParameters() {
+        return new JobMessageParameters();
+    }
+
+    public static JobManagerJobEnvironmentHeaders getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Returns the jobmanager's environment variables of a specific job.";
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index b3fead6af48..cf5ad90abcc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler;
 import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobManagerJobConfigurationHandler;
+import org.apache.flink.runtime.rest.handler.job.JobManagerJobEnvironmentHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobStatusHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
@@ -125,6 +126,7 @@ import org.apache.flink.runtime.rest.messages.cluster.JobManagerThreadDumpHeader
 import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobManagerJobEnvironmentHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
@@ -328,6 +330,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
                         responseHeaders,
                         JobManagerEnvironmentHeaders.getInstance());
 
+        JobManagerJobEnvironmentHandler jobManagerJobEnvironmentHandler =
+                new JobManagerJobEnvironmentHandler(
+                        leaderRetriever,
+                        timeout,
+                        responseHeaders,
+                        JobManagerJobEnvironmentHeaders.getInstance());
+
         JobConfigHandler jobConfigHandler =
                 new JobConfigHandler(
                         leaderRetriever,
@@ -683,6 +692,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
                 Tuple2.of(
                         jobManagerEnvironmentHandler.getMessageHeaders(),
                         jobManagerEnvironmentHandler));
+        handlers.add(
+                Tuple2.of(
+                        jobManagerJobEnvironmentHandler.getMessageHeaders(),
+                        jobManagerJobEnvironmentHandler));
         handlers.add(Tuple2.of(dashboardConfigHandler.getMessageHeaders(), dashboardConfigHandler));
         handlers.add(Tuple2.of(jobIdsHandler.getMessageHeaders(), jobIdsHandler));
         handlers.add(Tuple2.of(jobStatusHandler.getMessageHeaders(), jobStatusHandler));