You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/08/07 14:32:42 UTC

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4492

    [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway

    ## What is the purpose of the change
    
    This PR decouples the `WebRuntimeMonitor` from the `ActorGateway` by introducing the `JobManagerGateway` interface which can have multiple implementations. This is a preliminary step for the integration of the existing `WebRuntimeMonitor` with the Flip-6 `JobMaster`.
    
    This PR is based #4486.
    
    ## Brief change log
    
    - Extending the `JobManagerGateway` with methods to cover the requirements of the `WebRuntimeMonitor`
    - Change `JobManagerRetriever` to return `JobManagerGateway` instead of `Tuple2<ActorGateway, Integer>`
    - Adapt handlers to use the `JobManagerRetriever`
    - Introduce `MetricQueryServiceRetriever` to retrieve `MetricQueryService` implementations running alongside with the JM and TM
    - Introduce `MetricQueryServiceGateway` to abstract the implementation details of the `MetricQueryService` (e.g. Akka based)
    - Pass `MetricQueryServiceRetriever` to `WebRuntimeMonitor` and `MetricFetcher`
    - Adapt test classes to work with newly introduced interfaces
    - Add `web.timeout` configuration option to control the WebRuntimeMonitor timeouts (see `WebMonitorOptions#WEB_TIMEOUT`
    
    ## Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (nop)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink webMonitorFlip6

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4492.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4492
    
----
commit 50d304647e65be90937809d71cef87608f60b9ce
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-04T22:28:15Z

    [FLINK-7372] [JobGraph] Remove ActorGateway from JobGraph
    
    The JobGraph has an unncessary dependency on the ActorGateway via its JobGraph#uploadUserJars method. In
    order to get rid of this dependency for future Flip-6 changes, this commit retrieves the BlobServer's
    address beforehand and directly passes it to this method.

commit e4596e060b471464064de142d16d86c0a52ca078
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-06T15:56:41Z

    [FLINK-7375] Replace ActorGateway with JobManagerGateway in JobClient
    
    In order to make the JobClient code independent of Akka, this PR replaces the
    ActorGateway parameters by JobManagerGateway. AkkaJobManagerGateway is the
    respective implementation of the JobManagerGateway for Akka. Moreover, this
    PR introduces useful ExceptionUtils method for handling of Future exceptions.
    Additionally, the SerializedThrowable has been moved to flink-core.

commit c88b83db49d0f6f45578e7563e3dd7e28c3a24d3
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-02T16:43:00Z

    [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r132490977
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java ---
    @@ -149,6 +149,13 @@
     			.defaultValue(50)
     			.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples");
     
    +	/**
    +	 * Timeout for asynchronous operations by the WebRuntimeMonitor
    +	 */
    +	public static final ConfigOption<Long> TIMEOUT = ConfigOptions
    --- End diff --
    
    True. Will add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorG...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4492
  
    Thanks for the review @zentol. Rebasing onto the latest master and if Travis gives green light, then I'll merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorG...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4492
  
    Thanks for the review @zentol. I've addressed most of your comments modulo the web frontend configuration options and rebased the PR onto the latest master.
    
    I've tested the web frontend locally with a standalone cluster but not with Yarn and HA. Will try these configuration out as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r132141613
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/WebMonitorOptions.java ---
    @@ -0,0 +1,38 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.configuration;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +
    +/**
    + * Configuration options for the WebRuntimeMonitor
    + */
    +@PublicEvolving
    +public class WebMonitorOptions {
    --- End diff --
    
    True, but the timeout option is not job manager specific. In the future there will be multiple web frontend components running (one running next to the dispatcher, another one running next to a JobManager). Therefore, I though about introducing general options for the web monitor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r132142562
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java ---
    @@ -247,6 +262,8 @@ public static Path validateAndNormalizeUri(URI archiveDirUri) {
     		return new Path(archiveDirUri);
     	}
     
    +
    +
    --- End diff --
    
    Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r132141346
  
    --- Diff: docs/setup/config.md ---
    @@ -389,6 +389,10 @@ These parameters allow for advanced tuning. The default values are sufficient wh
     
     - `jobmanager.web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`.
     
    +### Web Frontend
    --- End diff --
    
    True but this is the `JobManager Web Frontend`. In the future (with Flip-6) there will be more than one web frontend component which is not necessarily tied to the `JobManager`. Therefore, I though about introducing some general web frontend configuration options. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r131671522
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/WebMonitorOptions.java ---
    @@ -0,0 +1,38 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.configuration;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +
    +/**
    + * Configuration options for the WebRuntimeMonitor
    + */
    +@PublicEvolving
    +public class WebMonitorOptions {
    --- End diff --
    
    there is a section for web monitor options in the `JobManagerOptions` class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorG...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4492
  
    Thanks for testing things on yarn/HA.
    
    As for the web monitor options, I do see the point of not coupling new options to the jobmanager. However, I think it's a bad idea to have some be tied to the jobmanager while others aren't. This smells like a repeat of the ConfigOption introduction, where we approached things with a "let's refactor the rest later" mentality, and that still isn't complete. I suggest to port all webmonitor options first (with deprecates keys obviously), and then rebase the PR on top of that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r132142385
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java ---
    @@ -101,127 +98,112 @@ public void update() {
     
     	private void fetchMetrics() {
     		try {
    -			Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
    -			if (jobManagerGatewayAndWebPort.isDefined()) {
    -				ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1();
    +			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
    +			if (optJobManagerGateway.isPresent()) {
    +				final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
     
     				/**
     				 * Remove all metrics that belong to a job that is not running and no longer archived.
     				 */
    -				Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout);
    -				jobDetailsFuture
    -					.onSuccess(new OnSuccess<Object>() {
    -						@Override
    -						public void onSuccess(Object result) throws Throwable {
    -							MultipleJobsDetails details = (MultipleJobsDetails) result;
    +				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
    +
    +				jobDetailsFuture.whenCompleteAsync(
    +					(MultipleJobsDetails jobDetails, Throwable throwable) -> {
    +						if (throwable != null) {
    +							LOG.debug("Fetching of JobDetails failed.", throwable);
    +						} else {
     							ArrayList<String> toRetain = new ArrayList<>();
    -							for (JobDetails job : details.getRunningJobs()) {
    +							for (JobDetails job : jobDetails.getRunningJobs()) {
     								toRetain.add(job.getJobId().toString());
     							}
    -							for (JobDetails job : details.getFinishedJobs()) {
    +							for (JobDetails job : jobDetails.getFinishedJobs()) {
     								toRetain.add(job.getJobId().toString());
     							}
     							synchronized (metrics) {
     								metrics.jobs.keySet().retainAll(toRetain);
     							}
     						}
    -					}, ctx);
    -				logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed.");
    +					},
    +					executor);
     
    -				String jobManagerPath = jobManager.path();
    -				String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
    -				ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath);
    +				String jobManagerPath = jobManagerGateway.getAddress();
    +				String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
     
    -				queryMetrics(jobManagerQueryService);
    +				retrieveAndQueryMetrics(jmQueryServicePath);
     
     				/**
     				 * We first request the list of all registered task managers from the job manager, and then
     				 * request the respective metric dump from each task manager.
     				 *
     				 * <p>All stored metrics that do not belong to a registered task manager will be removed.
     				 */
    -				Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
    -				registeredTaskManagersFuture
    -					.onSuccess(new OnSuccess<Object>() {
    -						@Override
    -						public void onSuccess(Object result) throws Throwable {
    -							Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable();
    -							List<String> activeTaskManagers = new ArrayList<>();
    -							for (Instance taskManager : taskManagers) {
    -								activeTaskManagers.add(taskManager.getId().toString());
    -
    -								String taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
    -								String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString();
    -								ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath);
    -
    -								queryMetrics(taskManagerQueryService);
    -							}
    -							synchronized (metrics) { // remove all metrics belonging to unregistered task managers
    +				CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
    +
    +				taskManagersFuture.whenCompleteAsync(
    +					(Collection<Instance> taskManagers, Throwable throwable) -> {
    +						if (throwable != null) {
    +							LOG.debug("Fetching list of registered TaskManagers failed.", throwable);
    +						} else {
    +							List<String> activeTaskManagers = taskManagers.stream().map(
    +								taskManagerInstance -> {
    +									final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress();
    +									final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString();
    +
    +									retrieveAndQueryMetrics(tmQueryServicePath);
    +
    +									return taskManagerInstance.getId().toString();
    +								}).collect(Collectors.toList());
    +
    +							synchronized (metrics) {
     								metrics.taskManagers.keySet().retainAll(activeTaskManagers);
     							}
     						}
    -					}, ctx);
    -				logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed.");
    +					},
    +					executor);
     			}
     		} catch (Exception e) {
     			LOG.warn("Exception while fetching metrics.", e);
     		}
     	}
     
    -	private void logErrorOnFailure(Future<Object> future, final String message) {
    -		future.onFailure(new OnFailure() {
    -			@Override
    -			public void onFailure(Throwable failure) throws Throwable {
    -				LOG.debug(message, failure);
    -			}
    -		}, ctx);
    -	}
    -
     	/**
    -	 * Requests a metric dump from the given actor.
    +	 * Retrieves and queries the specified QueryServiceGateway.
     	 *
    -	 * @param actor ActorRef to request the dump from
    -     */
    -	private void queryMetrics(ActorRef actor) {
    -		Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
    -		metricQueryFuture
    -			.onSuccess(new OnSuccess<Object>() {
    -				@Override
    -				public void onSuccess(Object result) throws Throwable {
    -					addMetrics(result);
    +	 * @param queryServicePath specifying the QueryServiceGateway
    +	 */
    +	private void retrieveAndQueryMetrics(String queryServicePath) {
    +		final CompletableFuture<MetricQueryServiceGateway> jmQueryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
    +
    +		jmQueryServiceGatewayFuture.whenCompleteAsync(
    +			(MetricQueryServiceGateway queryServiceGateway, Throwable t) -> {
    +				if (t != null) {
    +					LOG.debug("Could not retrieve QueryServiceGateway.", t);
    +				} else {
    +					queryMetrics(queryServiceGateway);
     				}
    -			}, ctx);
    -		logErrorOnFailure(metricQueryFuture, "Fetching metrics failed.");
    -	}
    -
    -	private void addMetrics(Object result) {
    -		MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result;
    -		List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
    -		for (MetricDump metric : dumpedMetrics) {
    -			metrics.add(metric);
    -		}
    +			},
    +			executor);
     	}
     
     	/**
    -	 * Helper class that allows mocking of the answer.
    -     */
    -	static class BasicGateway {
    -		private final ActorRef actor;
    -
    -		private BasicGateway(ActorRef actor) {
    -			this.actor = actor;
    -		}
    -
    -		/**
    -		 * Sends a message asynchronously and returns its response. The response to the message is
    -		 * returned as a future.
    -		 *
    -		 * @param message Message to be sent
    -		 * @param timeout Timeout until the Future is completed with an AskTimeoutException
    -		 * @return Future which contains the response to the sent message
    -		 */
    -		public Future<Object> ask(Object message, FiniteDuration timeout) {
    -			return Patterns.ask(actor, message, new Timeout(timeout));
    -		}
    +	 * Query the metrics from the given QueryServiceGateway.
    +	 *
    +	 * @param queryServiceGateway to query for metrics
    +	 */
    +	private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
    +		queryServiceGateway
    +			.queryMetrics(timeout)
    +			.whenCompleteAsync(
    +				(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
    +					if (t != null) {
    +						LOG.debug("Fetching metrics failed.", t);
    +					} else {
    +						List<MetricDump> dumpedMetrics = deserializer.deserialize(result);
    +						for (MetricDump metric : dumpedMetrics) {
    --- End diff --
    
    Is this really how FLINK-7368 will be solved? I thought there is still some discussion ongoing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r132427008
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java ---
    @@ -149,6 +149,13 @@
     			.defaultValue(50)
     			.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples");
     
    +	/**
    +	 * Timeout for asynchronous operations by the WebRuntimeMonitor
    +	 */
    +	public static final ConfigOption<Long> TIMEOUT = ConfigOptions
    --- End diff --
    
    We should document the time-unit in some way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r132155748
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java ---
    @@ -101,127 +98,112 @@ public void update() {
     
     	private void fetchMetrics() {
     		try {
    -			Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
    -			if (jobManagerGatewayAndWebPort.isDefined()) {
    -				ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1();
    +			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
    +			if (optJobManagerGateway.isPresent()) {
    +				final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
     
     				/**
     				 * Remove all metrics that belong to a job that is not running and no longer archived.
     				 */
    -				Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout);
    -				jobDetailsFuture
    -					.onSuccess(new OnSuccess<Object>() {
    -						@Override
    -						public void onSuccess(Object result) throws Throwable {
    -							MultipleJobsDetails details = (MultipleJobsDetails) result;
    +				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
    +
    +				jobDetailsFuture.whenCompleteAsync(
    +					(MultipleJobsDetails jobDetails, Throwable throwable) -> {
    +						if (throwable != null) {
    +							LOG.debug("Fetching of JobDetails failed.", throwable);
    +						} else {
     							ArrayList<String> toRetain = new ArrayList<>();
    -							for (JobDetails job : details.getRunningJobs()) {
    +							for (JobDetails job : jobDetails.getRunningJobs()) {
     								toRetain.add(job.getJobId().toString());
     							}
    -							for (JobDetails job : details.getFinishedJobs()) {
    +							for (JobDetails job : jobDetails.getFinishedJobs()) {
     								toRetain.add(job.getJobId().toString());
     							}
     							synchronized (metrics) {
     								metrics.jobs.keySet().retainAll(toRetain);
     							}
     						}
    -					}, ctx);
    -				logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed.");
    +					},
    +					executor);
     
    -				String jobManagerPath = jobManager.path();
    -				String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
    -				ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath);
    +				String jobManagerPath = jobManagerGateway.getAddress();
    +				String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
     
    -				queryMetrics(jobManagerQueryService);
    +				retrieveAndQueryMetrics(jmQueryServicePath);
     
     				/**
     				 * We first request the list of all registered task managers from the job manager, and then
     				 * request the respective metric dump from each task manager.
     				 *
     				 * <p>All stored metrics that do not belong to a registered task manager will be removed.
     				 */
    -				Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
    -				registeredTaskManagersFuture
    -					.onSuccess(new OnSuccess<Object>() {
    -						@Override
    -						public void onSuccess(Object result) throws Throwable {
    -							Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable();
    -							List<String> activeTaskManagers = new ArrayList<>();
    -							for (Instance taskManager : taskManagers) {
    -								activeTaskManagers.add(taskManager.getId().toString());
    -
    -								String taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
    -								String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString();
    -								ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath);
    -
    -								queryMetrics(taskManagerQueryService);
    -							}
    -							synchronized (metrics) { // remove all metrics belonging to unregistered task managers
    +				CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
    +
    +				taskManagersFuture.whenCompleteAsync(
    +					(Collection<Instance> taskManagers, Throwable throwable) -> {
    +						if (throwable != null) {
    +							LOG.debug("Fetching list of registered TaskManagers failed.", throwable);
    +						} else {
    +							List<String> activeTaskManagers = taskManagers.stream().map(
    +								taskManagerInstance -> {
    +									final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress();
    +									final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString();
    +
    +									retrieveAndQueryMetrics(tmQueryServicePath);
    +
    +									return taskManagerInstance.getId().toString();
    +								}).collect(Collectors.toList());
    +
    +							synchronized (metrics) {
     								metrics.taskManagers.keySet().retainAll(activeTaskManagers);
     							}
     						}
    -					}, ctx);
    -				logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed.");
    +					},
    +					executor);
     			}
     		} catch (Exception e) {
     			LOG.warn("Exception while fetching metrics.", e);
     		}
     	}
     
    -	private void logErrorOnFailure(Future<Object> future, final String message) {
    -		future.onFailure(new OnFailure() {
    -			@Override
    -			public void onFailure(Throwable failure) throws Throwable {
    -				LOG.debug(message, failure);
    -			}
    -		}, ctx);
    -	}
    -
     	/**
    -	 * Requests a metric dump from the given actor.
    +	 * Retrieves and queries the specified QueryServiceGateway.
     	 *
    -	 * @param actor ActorRef to request the dump from
    -     */
    -	private void queryMetrics(ActorRef actor) {
    -		Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
    -		metricQueryFuture
    -			.onSuccess(new OnSuccess<Object>() {
    -				@Override
    -				public void onSuccess(Object result) throws Throwable {
    -					addMetrics(result);
    +	 * @param queryServicePath specifying the QueryServiceGateway
    +	 */
    +	private void retrieveAndQueryMetrics(String queryServicePath) {
    +		final CompletableFuture<MetricQueryServiceGateway> jmQueryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
    +
    +		jmQueryServiceGatewayFuture.whenCompleteAsync(
    +			(MetricQueryServiceGateway queryServiceGateway, Throwable t) -> {
    +				if (t != null) {
    +					LOG.debug("Could not retrieve QueryServiceGateway.", t);
    +				} else {
    +					queryMetrics(queryServiceGateway);
     				}
    -			}, ctx);
    -		logErrorOnFailure(metricQueryFuture, "Fetching metrics failed.");
    -	}
    -
    -	private void addMetrics(Object result) {
    -		MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result;
    -		List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
    -		for (MetricDump metric : dumpedMetrics) {
    -			metrics.add(metric);
    -		}
    +			},
    +			executor);
     	}
     
     	/**
    -	 * Helper class that allows mocking of the answer.
    -     */
    -	static class BasicGateway {
    -		private final ActorRef actor;
    -
    -		private BasicGateway(ActorRef actor) {
    -			this.actor = actor;
    -		}
    -
    -		/**
    -		 * Sends a message asynchronously and returns its response. The response to the message is
    -		 * returned as a future.
    -		 *
    -		 * @param message Message to be sent
    -		 * @param timeout Timeout until the Future is completed with an AskTimeoutException
    -		 * @return Future which contains the response to the sent message
    -		 */
    -		public Future<Object> ask(Object message, FiniteDuration timeout) {
    -			return Patterns.ask(actor, message, new Timeout(timeout));
    -		}
    +	 * Query the metrics from the given QueryServiceGateway.
    +	 *
    +	 * @param queryServiceGateway to query for metrics
    +	 */
    +	private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
    +		queryServiceGateway
    +			.queryMetrics(timeout)
    +			.whenCompleteAsync(
    +				(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
    +					if (t != null) {
    +						LOG.debug("Fetching metrics failed.", t);
    +					} else {
    +						List<MetricDump> dumpedMetrics = deserializer.deserialize(result);
    +						for (MetricDump metric : dumpedMetrics) {
    --- End diff --
    
    Yes, there's still a discussion about it, but that's not an argument against fixing it now to make it behave the way it was originally intended to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorG...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4492
  
    This PR also needs a rebase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorG...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4492
  
    @zentol I've created the [WebOptions PR](#4512) and rebased onto that as you've requested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r131680908
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java ---
    @@ -247,6 +262,8 @@ public static Path validateAndNormalizeUri(URI archiveDirUri) {
     		return new Path(archiveDirUri);
     	}
     
    +
    +
    --- End diff --
    
    remove empty lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r131683814
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/WebMonitorOptions.java ---
    @@ -0,0 +1,38 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.configuration;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +
    +/**
    + * Configuration options for the WebRuntimeMonitor
    + */
    +@PublicEvolving
    +public class WebMonitorOptions {
    +
    +	/**
    +	 * Timeout for asynchronous operations by the WebRuntimeMonitor
    +	 */
    +	public static final ConfigOption<Long> WEB_TIMEOUT = ConfigOptions
    +		.key("web.timeout")
    --- End diff --
    
    this config key is inconsistent with the existing web monitor options,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r131671417
  
    --- Diff: docs/setup/config.md ---
    @@ -389,6 +389,10 @@ These parameters allow for advanced tuning. The default values are sufficient wh
     
     - `jobmanager.web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`.
     
    +### Web Frontend
    --- End diff --
    
    there is already a heading for the web frontend


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorG...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4492
  
    I did the additional tests running Flink on Yarn with and without HA and the web frontend worked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r132143993
  
    --- Diff: docs/setup/config.md ---
    @@ -389,6 +389,10 @@ These parameters allow for advanced tuning. The default values are sufficient wh
     
     - `jobmanager.web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`.
     
    +### Web Frontend
    --- End diff --
    
    I think that having some options tied to the jobmanager while others aren't will just cause more work down the line. Either add it the the jobmanager options now and move it in a follow-up, or separate the options now and rebase the PR on top of it.
    
    The "let's refactor the rest later" mentality just doesn't work for us, see the translation to ConfigOptions which _still isn't done_.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r132142466
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java ---
    @@ -63,4 +70,103 @@
     	 * @return Future containing an Acknowledge message if the submission succeeded
     	 */
     	CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout);
    +
    +	/**
    +	 * Cancels the given job after taking a savepoint and returning its path.
    +	 *
    +	 * If the savepointPath is null, then the JobManager will use the default savepoint directory
    +	 * to store the savepoint in. After the savepoint has been taken and the job has been canceled
    +	 * successfully, the path of the savepoint is returned.
    +	 *
    +	 * @param jobId identifying the job to cancel
    +	 * @param savepointPath Optional path for the savepoint to be stored under; if null, then the default path is
    +	 *                      taken
    +	 * @param timeout for the asynchronous operation
    +	 * @return Future containing the savepoint path of the taken savepoint or an Exception if the operation failed
    +	 */
    +	CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, @Nullable String savepointPath, Time timeout);
    +
    +	/**
    +	 * Cancels the given job.
    +	 *
    +	 * @param jobId identifying the job to cancel
    +	 * @param timeout for the asynchronous operation
    +	 * @return Future containing Acknowledge or an Exception if the operation failed
    +	 */
    +	CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout);
    +
    +	/**
    +	 * Stops the given job.
    +	 *
    +	 * @param jobId identifying the job to cancel
    +	 * @param timeout for the asynchronous operation
    +	 * @return Future containing Acknowledge or an Exception if the operation failed
    +	 */
    +	CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout);
    +
    +	/**
    +	 * Requests the class loading properties for the given JobID.
    +	 *
    +	 * @param jobId for which the class loading properties are requested
    +	 * @param timeout for this operation
    +	 * @return Future containing the optional class loading properties if they could be retrieved from the JobManager.
    +	 */
    +	CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout);
    +
    +	/**
    +	 * Requests the TaksManager instance registered under the given instanceId from the JobManager.
    --- End diff --
    
    Thanks for catching.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorG...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4492
  
    Alright, I'll port the web monitor options and rebase this PR onto that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r131680691
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java ---
    @@ -63,4 +70,103 @@
     	 * @return Future containing an Acknowledge message if the submission succeeded
     	 */
     	CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout);
    +
    +	/**
    +	 * Cancels the given job after taking a savepoint and returning its path.
    +	 *
    +	 * If the savepointPath is null, then the JobManager will use the default savepoint directory
    +	 * to store the savepoint in. After the savepoint has been taken and the job has been canceled
    +	 * successfully, the path of the savepoint is returned.
    +	 *
    +	 * @param jobId identifying the job to cancel
    +	 * @param savepointPath Optional path for the savepoint to be stored under; if null, then the default path is
    +	 *                      taken
    +	 * @param timeout for the asynchronous operation
    +	 * @return Future containing the savepoint path of the taken savepoint or an Exception if the operation failed
    +	 */
    +	CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, @Nullable String savepointPath, Time timeout);
    +
    +	/**
    +	 * Cancels the given job.
    +	 *
    +	 * @param jobId identifying the job to cancel
    +	 * @param timeout for the asynchronous operation
    +	 * @return Future containing Acknowledge or an Exception if the operation failed
    +	 */
    +	CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout);
    +
    +	/**
    +	 * Stops the given job.
    +	 *
    +	 * @param jobId identifying the job to cancel
    +	 * @param timeout for the asynchronous operation
    +	 * @return Future containing Acknowledge or an Exception if the operation failed
    +	 */
    +	CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout);
    +
    +	/**
    +	 * Requests the class loading properties for the given JobID.
    +	 *
    +	 * @param jobId for which the class loading properties are requested
    +	 * @param timeout for this operation
    +	 * @return Future containing the optional class loading properties if they could be retrieved from the JobManager.
    +	 */
    +	CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout);
    +
    +	/**
    +	 * Requests the TaksManager instance registered under the given instanceId from the JobManager.
    --- End diff --
    
    typo TaskManager


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4492


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r131676491
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java ---
    @@ -101,127 +98,112 @@ public void update() {
     
     	private void fetchMetrics() {
     		try {
    -			Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
    -			if (jobManagerGatewayAndWebPort.isDefined()) {
    -				ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1();
    +			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
    +			if (optJobManagerGateway.isPresent()) {
    +				final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
     
     				/**
     				 * Remove all metrics that belong to a job that is not running and no longer archived.
     				 */
    -				Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout);
    -				jobDetailsFuture
    -					.onSuccess(new OnSuccess<Object>() {
    -						@Override
    -						public void onSuccess(Object result) throws Throwable {
    -							MultipleJobsDetails details = (MultipleJobsDetails) result;
    +				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
    +
    +				jobDetailsFuture.whenCompleteAsync(
    +					(MultipleJobsDetails jobDetails, Throwable throwable) -> {
    +						if (throwable != null) {
    +							LOG.debug("Fetching of JobDetails failed.", throwable);
    +						} else {
     							ArrayList<String> toRetain = new ArrayList<>();
    -							for (JobDetails job : details.getRunningJobs()) {
    +							for (JobDetails job : jobDetails.getRunningJobs()) {
     								toRetain.add(job.getJobId().toString());
     							}
    -							for (JobDetails job : details.getFinishedJobs()) {
    +							for (JobDetails job : jobDetails.getFinishedJobs()) {
     								toRetain.add(job.getJobId().toString());
     							}
     							synchronized (metrics) {
     								metrics.jobs.keySet().retainAll(toRetain);
     							}
     						}
    -					}, ctx);
    -				logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed.");
    +					},
    +					executor);
     
    -				String jobManagerPath = jobManager.path();
    -				String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
    -				ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath);
    +				String jobManagerPath = jobManagerGateway.getAddress();
    +				String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
     
    -				queryMetrics(jobManagerQueryService);
    +				retrieveAndQueryMetrics(jmQueryServicePath);
     
     				/**
     				 * We first request the list of all registered task managers from the job manager, and then
     				 * request the respective metric dump from each task manager.
     				 *
     				 * <p>All stored metrics that do not belong to a registered task manager will be removed.
     				 */
    -				Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
    -				registeredTaskManagersFuture
    -					.onSuccess(new OnSuccess<Object>() {
    -						@Override
    -						public void onSuccess(Object result) throws Throwable {
    -							Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable();
    -							List<String> activeTaskManagers = new ArrayList<>();
    -							for (Instance taskManager : taskManagers) {
    -								activeTaskManagers.add(taskManager.getId().toString());
    -
    -								String taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
    -								String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString();
    -								ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath);
    -
    -								queryMetrics(taskManagerQueryService);
    -							}
    -							synchronized (metrics) { // remove all metrics belonging to unregistered task managers
    +				CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
    +
    +				taskManagersFuture.whenCompleteAsync(
    +					(Collection<Instance> taskManagers, Throwable throwable) -> {
    +						if (throwable != null) {
    +							LOG.debug("Fetching list of registered TaskManagers failed.", throwable);
    +						} else {
    +							List<String> activeTaskManagers = taskManagers.stream().map(
    +								taskManagerInstance -> {
    +									final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress();
    +									final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString();
    +
    +									retrieveAndQueryMetrics(tmQueryServicePath);
    +
    +									return taskManagerInstance.getId().toString();
    +								}).collect(Collectors.toList());
    +
    +							synchronized (metrics) {
     								metrics.taskManagers.keySet().retainAll(activeTaskManagers);
     							}
     						}
    -					}, ctx);
    -				logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed.");
    +					},
    +					executor);
     			}
     		} catch (Exception e) {
     			LOG.warn("Exception while fetching metrics.", e);
     		}
     	}
     
    -	private void logErrorOnFailure(Future<Object> future, final String message) {
    -		future.onFailure(new OnFailure() {
    -			@Override
    -			public void onFailure(Throwable failure) throws Throwable {
    -				LOG.debug(message, failure);
    -			}
    -		}, ctx);
    -	}
    -
     	/**
    -	 * Requests a metric dump from the given actor.
    +	 * Retrieves and queries the specified QueryServiceGateway.
     	 *
    -	 * @param actor ActorRef to request the dump from
    -     */
    -	private void queryMetrics(ActorRef actor) {
    -		Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
    -		metricQueryFuture
    -			.onSuccess(new OnSuccess<Object>() {
    -				@Override
    -				public void onSuccess(Object result) throws Throwable {
    -					addMetrics(result);
    +	 * @param queryServicePath specifying the QueryServiceGateway
    +	 */
    +	private void retrieveAndQueryMetrics(String queryServicePath) {
    +		final CompletableFuture<MetricQueryServiceGateway> jmQueryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
    +
    +		jmQueryServiceGatewayFuture.whenCompleteAsync(
    +			(MetricQueryServiceGateway queryServiceGateway, Throwable t) -> {
    +				if (t != null) {
    +					LOG.debug("Could not retrieve QueryServiceGateway.", t);
    +				} else {
    +					queryMetrics(queryServiceGateway);
     				}
    -			}, ctx);
    -		logErrorOnFailure(metricQueryFuture, "Fetching metrics failed.");
    -	}
    -
    -	private void addMetrics(Object result) {
    -		MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result;
    -		List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
    -		for (MetricDump metric : dumpedMetrics) {
    -			metrics.add(metric);
    -		}
    +			},
    +			executor);
     	}
     
     	/**
    -	 * Helper class that allows mocking of the answer.
    -     */
    -	static class BasicGateway {
    -		private final ActorRef actor;
    -
    -		private BasicGateway(ActorRef actor) {
    -			this.actor = actor;
    -		}
    -
    -		/**
    -		 * Sends a message asynchronously and returns its response. The response to the message is
    -		 * returned as a future.
    -		 *
    -		 * @param message Message to be sent
    -		 * @param timeout Timeout until the Future is completed with an AskTimeoutException
    -		 * @return Future which contains the response to the sent message
    -		 */
    -		public Future<Object> ask(Object message, FiniteDuration timeout) {
    -			return Patterns.ask(actor, message, new Timeout(timeout));
    -		}
    +	 * Query the metrics from the given QueryServiceGateway.
    +	 *
    +	 * @param queryServiceGateway to query for metrics
    +	 */
    +	private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
    +		queryServiceGateway
    +			.queryMetrics(timeout)
    +			.whenCompleteAsync(
    +				(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
    +					if (t != null) {
    +						LOG.debug("Fetching metrics failed.", t);
    +					} else {
    +						List<MetricDump> dumpedMetrics = deserializer.deserialize(result);
    +						for (MetricDump metric : dumpedMetrics) {
    --- End diff --
    
    please add a synchronized(metrics) block around this loop, see FLINK-7368.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r131675749
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java ---
    @@ -101,127 +98,112 @@ public void update() {
     
     	private void fetchMetrics() {
     		try {
    -			Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
    -			if (jobManagerGatewayAndWebPort.isDefined()) {
    -				ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1();
    +			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
    +			if (optJobManagerGateway.isPresent()) {
    +				final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
     
     				/**
     				 * Remove all metrics that belong to a job that is not running and no longer archived.
     				 */
    -				Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout);
    -				jobDetailsFuture
    -					.onSuccess(new OnSuccess<Object>() {
    -						@Override
    -						public void onSuccess(Object result) throws Throwable {
    -							MultipleJobsDetails details = (MultipleJobsDetails) result;
    +				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
    +
    +				jobDetailsFuture.whenCompleteAsync(
    +					(MultipleJobsDetails jobDetails, Throwable throwable) -> {
    +						if (throwable != null) {
    +							LOG.debug("Fetching of JobDetails failed.", throwable);
    +						} else {
     							ArrayList<String> toRetain = new ArrayList<>();
    -							for (JobDetails job : details.getRunningJobs()) {
    +							for (JobDetails job : jobDetails.getRunningJobs()) {
     								toRetain.add(job.getJobId().toString());
     							}
    -							for (JobDetails job : details.getFinishedJobs()) {
    +							for (JobDetails job : jobDetails.getFinishedJobs()) {
     								toRetain.add(job.getJobId().toString());
     							}
     							synchronized (metrics) {
     								metrics.jobs.keySet().retainAll(toRetain);
     							}
     						}
    -					}, ctx);
    -				logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed.");
    +					},
    +					executor);
     
    -				String jobManagerPath = jobManager.path();
    -				String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
    -				ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath);
    +				String jobManagerPath = jobManagerGateway.getAddress();
    +				String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
     
    -				queryMetrics(jobManagerQueryService);
    +				retrieveAndQueryMetrics(jmQueryServicePath);
     
     				/**
     				 * We first request the list of all registered task managers from the job manager, and then
     				 * request the respective metric dump from each task manager.
     				 *
     				 * <p>All stored metrics that do not belong to a registered task manager will be removed.
     				 */
    -				Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
    -				registeredTaskManagersFuture
    -					.onSuccess(new OnSuccess<Object>() {
    -						@Override
    -						public void onSuccess(Object result) throws Throwable {
    -							Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable();
    -							List<String> activeTaskManagers = new ArrayList<>();
    -							for (Instance taskManager : taskManagers) {
    -								activeTaskManagers.add(taskManager.getId().toString());
    -
    -								String taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
    -								String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString();
    -								ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath);
    -
    -								queryMetrics(taskManagerQueryService);
    -							}
    -							synchronized (metrics) { // remove all metrics belonging to unregistered task managers
    +				CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
    +
    +				taskManagersFuture.whenCompleteAsync(
    +					(Collection<Instance> taskManagers, Throwable throwable) -> {
    +						if (throwable != null) {
    +							LOG.debug("Fetching list of registered TaskManagers failed.", throwable);
    +						} else {
    +							List<String> activeTaskManagers = taskManagers.stream().map(
    +								taskManagerInstance -> {
    +									final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress();
    +									final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString();
    +
    +									retrieveAndQueryMetrics(tmQueryServicePath);
    +
    +									return taskManagerInstance.getId().toString();
    +								}).collect(Collectors.toList());
    +
    +							synchronized (metrics) {
     								metrics.taskManagers.keySet().retainAll(activeTaskManagers);
     							}
     						}
    -					}, ctx);
    -				logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed.");
    +					},
    +					executor);
     			}
     		} catch (Exception e) {
     			LOG.warn("Exception while fetching metrics.", e);
     		}
     	}
     
    -	private void logErrorOnFailure(Future<Object> future, final String message) {
    -		future.onFailure(new OnFailure() {
    -			@Override
    -			public void onFailure(Throwable failure) throws Throwable {
    -				LOG.debug(message, failure);
    -			}
    -		}, ctx);
    -	}
    -
     	/**
    -	 * Requests a metric dump from the given actor.
    +	 * Retrieves and queries the specified QueryServiceGateway.
     	 *
    -	 * @param actor ActorRef to request the dump from
    -     */
    -	private void queryMetrics(ActorRef actor) {
    -		Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
    -		metricQueryFuture
    -			.onSuccess(new OnSuccess<Object>() {
    -				@Override
    -				public void onSuccess(Object result) throws Throwable {
    -					addMetrics(result);
    +	 * @param queryServicePath specifying the QueryServiceGateway
    +	 */
    +	private void retrieveAndQueryMetrics(String queryServicePath) {
    +		final CompletableFuture<MetricQueryServiceGateway> jmQueryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
    --- End diff --
    
    variable name is misleading, as we could also be retrieving a taskmanager query service.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r132141995
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java ---
    @@ -101,127 +98,112 @@ public void update() {
     
     	private void fetchMetrics() {
     		try {
    -			Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
    -			if (jobManagerGatewayAndWebPort.isDefined()) {
    -				ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1();
    +			Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow();
    +			if (optJobManagerGateway.isPresent()) {
    +				final JobManagerGateway jobManagerGateway = optJobManagerGateway.get();
     
     				/**
     				 * Remove all metrics that belong to a job that is not running and no longer archived.
     				 */
    -				Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout);
    -				jobDetailsFuture
    -					.onSuccess(new OnSuccess<Object>() {
    -						@Override
    -						public void onSuccess(Object result) throws Throwable {
    -							MultipleJobsDetails details = (MultipleJobsDetails) result;
    +				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
    +
    +				jobDetailsFuture.whenCompleteAsync(
    +					(MultipleJobsDetails jobDetails, Throwable throwable) -> {
    +						if (throwable != null) {
    +							LOG.debug("Fetching of JobDetails failed.", throwable);
    +						} else {
     							ArrayList<String> toRetain = new ArrayList<>();
    -							for (JobDetails job : details.getRunningJobs()) {
    +							for (JobDetails job : jobDetails.getRunningJobs()) {
     								toRetain.add(job.getJobId().toString());
     							}
    -							for (JobDetails job : details.getFinishedJobs()) {
    +							for (JobDetails job : jobDetails.getFinishedJobs()) {
     								toRetain.add(job.getJobId().toString());
     							}
     							synchronized (metrics) {
     								metrics.jobs.keySet().retainAll(toRetain);
     							}
     						}
    -					}, ctx);
    -				logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed.");
    +					},
    +					executor);
     
    -				String jobManagerPath = jobManager.path();
    -				String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
    -				ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath);
    +				String jobManagerPath = jobManagerGateway.getAddress();
    +				String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
     
    -				queryMetrics(jobManagerQueryService);
    +				retrieveAndQueryMetrics(jmQueryServicePath);
     
     				/**
     				 * We first request the list of all registered task managers from the job manager, and then
     				 * request the respective metric dump from each task manager.
     				 *
     				 * <p>All stored metrics that do not belong to a registered task manager will be removed.
     				 */
    -				Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
    -				registeredTaskManagersFuture
    -					.onSuccess(new OnSuccess<Object>() {
    -						@Override
    -						public void onSuccess(Object result) throws Throwable {
    -							Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable();
    -							List<String> activeTaskManagers = new ArrayList<>();
    -							for (Instance taskManager : taskManagers) {
    -								activeTaskManagers.add(taskManager.getId().toString());
    -
    -								String taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
    -								String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString();
    -								ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath);
    -
    -								queryMetrics(taskManagerQueryService);
    -							}
    -							synchronized (metrics) { // remove all metrics belonging to unregistered task managers
    +				CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
    +
    +				taskManagersFuture.whenCompleteAsync(
    +					(Collection<Instance> taskManagers, Throwable throwable) -> {
    +						if (throwable != null) {
    +							LOG.debug("Fetching list of registered TaskManagers failed.", throwable);
    +						} else {
    +							List<String> activeTaskManagers = taskManagers.stream().map(
    +								taskManagerInstance -> {
    +									final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress();
    +									final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString();
    +
    +									retrieveAndQueryMetrics(tmQueryServicePath);
    +
    +									return taskManagerInstance.getId().toString();
    +								}).collect(Collectors.toList());
    +
    +							synchronized (metrics) {
     								metrics.taskManagers.keySet().retainAll(activeTaskManagers);
     							}
     						}
    -					}, ctx);
    -				logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed.");
    +					},
    +					executor);
     			}
     		} catch (Exception e) {
     			LOG.warn("Exception while fetching metrics.", e);
     		}
     	}
     
    -	private void logErrorOnFailure(Future<Object> future, final String message) {
    -		future.onFailure(new OnFailure() {
    -			@Override
    -			public void onFailure(Throwable failure) throws Throwable {
    -				LOG.debug(message, failure);
    -			}
    -		}, ctx);
    -	}
    -
     	/**
    -	 * Requests a metric dump from the given actor.
    +	 * Retrieves and queries the specified QueryServiceGateway.
     	 *
    -	 * @param actor ActorRef to request the dump from
    -     */
    -	private void queryMetrics(ActorRef actor) {
    -		Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
    -		metricQueryFuture
    -			.onSuccess(new OnSuccess<Object>() {
    -				@Override
    -				public void onSuccess(Object result) throws Throwable {
    -					addMetrics(result);
    +	 * @param queryServicePath specifying the QueryServiceGateway
    +	 */
    +	private void retrieveAndQueryMetrics(String queryServicePath) {
    +		final CompletableFuture<MetricQueryServiceGateway> jmQueryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath);
    --- End diff --
    
    True, an artifact of a refactoring. Will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4492: [FLINK-7381] [web] Decouple WebRuntimeMonitor from...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4492#discussion_r132141830
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/WebMonitorOptions.java ---
    @@ -0,0 +1,38 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.configuration;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +
    +/**
    + * Configuration options for the WebRuntimeMonitor
    + */
    +@PublicEvolving
    +public class WebMonitorOptions {
    +
    +	/**
    +	 * Timeout for asynchronous operations by the WebRuntimeMonitor
    +	 */
    +	public static final ConfigOption<Long> WEB_TIMEOUT = ConfigOptions
    +		.key("web.timeout")
    --- End diff --
    
    Yes, see my comments above. I think the existing web monitor options are a bit misleading because the web monitor is not necessarily bound to the `JobManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---