You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2017/09/27 12:20:19 UTC

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/4734

    [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new REST endpoint

    ## What is the purpose of the change
    
    This PR ports the existing `CurrentJobIdsHandler` to the new REST endpoint. The existing class `JobsWithIdsOverview` is reused as the response message (second commit renames the class to `JobStatusesWithIdsOverview`). Introduced `CurrentJobIdsHeaders` as part of the porting.
    
    This PR also includes a fix that the previous response message only contained job ids for statuses `RUNNING`, `FAILED`, `FINISHED`, and `CANCELLED`, whereas there are actually more statuses to differentiate, such as `CREATED`, `RESTARTING`, `SUSPENDED`, etc.
    
    ## Brief change log
    
    - Let `CurrentJobIdsHandler` implement the `LegacyRestHandler` interface
    - Introduced `CurrentJobIdsHeaders`
    - Added new methods to `DispatcherGateway` and `JobMasterGateway` to facilitate the porting
    - Register handler at `DispatcherRestEndpoint`
    - Reuse `JobsWithIdsOverview` class as response message (renamed to `JobStatusesWithIdsOverview`)
    - Let response message differentiate between all possible `JobStatus`.
    
    ## Verifying this change
    
    This change is already covered by existing tests.
    
    A new `JobStatusesWithIdsOverviewTest` is added to test the message marshalling.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? n/a
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink portCurrentJobIdsHandler

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4734.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4734
    
----
commit 4f3ad7a5d6566e23bb5138b0115e0863cb445a67
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-09-27T11:02:32Z

    [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new REST endpoint

commit c8653b1cf78a2695db757b5c9a89b1125c74f1ea
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-09-27T11:20:07Z

    [FLINK-7652] [flip6] Rename JobsWithIdsOverview message class name
    
    Renamed to JobsStatusesWithIdsOverview, to better respresent the
    information that the message holds.

commit 662fe9cb64849b8af7758f9facaaa1dc65e6fb93
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-09-27T12:09:43Z

    [FLINK-7652] [flip6] Let JobStatusesWithIdsOverview differentiate between all possible JobStatuses
    
    Prior to this commit, the JobStatusesWithIdsOverview only differentiates
    between RUNNING, FAILED, CANCELLED, and FINISHED. This commit fixes that
    to cover all possible JobStatus, including CREATED, CANCELLING, FAILING,
    RESTARTING, SUSPENDED, and RECONCILING.

----


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    nvm, this isn't used by the web UI:


---

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4734#discussion_r143983019
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobStatusesWithIdsOverview.java ---
    @@ -0,0 +1,368 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.messages.webmonitor;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import com.fasterxml.jackson.core.JsonParser;
    +import com.fasterxml.jackson.databind.DeserializationContext;
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.SerializerProvider;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
    +import com.fasterxml.jackson.databind.ser.std.StdSerializer;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * An overview of how many jobs are in which status.
    + */
    +@JsonSerialize(using = JobStatusesWithIdsOverview.JobStatusesWithIdsOverviewSerializer.class)
    +@JsonDeserialize(using = JobStatusesWithIdsOverview.JobStatusesWithIdsOverviewDeserializer.class)
    +public class JobStatusesWithIdsOverview implements ResponseBody, InfoMessage {
    +
    +	private static final long serialVersionUID = -3699051943490133183L;
    +
    +	public static final String FIELD_NAME_JOBS_CREATED_IDS = "jobs-created";
    +	public static final String FIELD_NAME_JOBS_RUNNING_IDS = "jobs-running";
    +	public static final String FIELD_NAME_JOBS_FINISHED_IDS = "jobs-finished";
    +	public static final String FIELD_NAME_JOBS_CANCELLING_IDS = "jobs-cancelling";
    +	public static final String FIELD_NAME_JOBS_CANCELLED_IDS = "jobs-cancelled";
    +	public static final String FIELD_NAME_JOBS_FAILING_IDS = "jobs-failing";
    +	public static final String FIELD_NAME_JOBS_FAILED_IDS = "jobs-failed";
    +	public static final String FIELD_NAME_JOBS_RESTARTING_IDS = "jobs-restarting";
    +	public static final String FIELD_NAME_JOBS_SUSPENDED_IDS = "jobs-suspended";
    +	public static final String FIELD_NAME_JOBS_RECONCILING_IDS = "jobs-reconciling";
    +
    +	private final List<JobID> jobsCreated;
    +	private final List<JobID> jobsRunningOrPending;
    +	private final List<JobID> jobsFinished;
    +	private final List<JobID> jobsCancelling;
    +	private final List<JobID> jobsCancelled;
    +	private final List<JobID> jobsFailing;
    +	private final List<JobID> jobsFailed;
    +	private final List<JobID> jobsRestarting;
    +	private final List<JobID> jobsSuspended;
    +	private final List<JobID> jobsReconciling;
    +
    +	public JobStatusesWithIdsOverview(
    +			List<JobID> jobsCreated,
    +			List<JobID> jobsRunningOrPending,
    +			List<JobID> jobsFinished,
    +			List<JobID> jobsCancelling,
    +			List<JobID> jobsCancelled,
    +			List<JobID> jobsFailing,
    +			List<JobID> jobsFailed,
    +			List<JobID> jobsRestarting,
    +			List<JobID> jobsSuspended,
    +			List<JobID> jobsReconciling) {
    +
    +		this.jobsCreated = checkNotNull(jobsCreated);
    +		this.jobsRunningOrPending = checkNotNull(jobsRunningOrPending);
    +		this.jobsFinished = checkNotNull(jobsFinished);
    +		this.jobsCancelling = checkNotNull(jobsCancelling);
    +		this.jobsCancelled = checkNotNull(jobsCancelled);
    +		this.jobsFailing = checkNotNull(jobsFailing);
    +		this.jobsFailed = checkNotNull(jobsFailed);
    +		this.jobsRestarting = checkNotNull(jobsRestarting);
    +		this.jobsSuspended = checkNotNull(jobsSuspended);
    +		this.jobsReconciling = checkNotNull(jobsReconciling);
    +	}
    +
    +	public JobStatusesWithIdsOverview(JobStatusesWithIdsOverview first, JobStatusesWithIdsOverview second) {
    +		this.jobsCreated = combine(first.getJobsCreated(), second.getJobsCreated());
    +		this.jobsRunningOrPending = combine(first.getJobsRunningOrPending(), second.getJobsRunningOrPending());
    +		this.jobsFinished = combine(first.getJobsFinished(), second.getJobsFinished());
    +		this.jobsCancelling = combine(first.getJobsCancelling(), second.getJobsCancelling());
    +		this.jobsCancelled = combine(first.getJobsCancelled(), second.getJobsCancelled());
    +		this.jobsFailing = combine(first.getJobsFailing(), second.getJobsFailing());
    +		this.jobsFailed = combine(first.getJobsFailed(), second.getJobsFailed());
    +		this.jobsRestarting = combine(first.getJobsRestarting(), second.getJobsRestarting());
    +		this.jobsSuspended = combine(first.getJobsSuspended(), second.getJobsSuspended());
    +		this.jobsReconciling = combine(first.getJobsReconciling(), second.getJobsReconciling());
    +	}
    +
    +	public List<JobID> getJobsCreated() {
    +		return jobsCreated;
    +	}
    +
    +	public List<JobID> getJobsRunningOrPending() {
    +		return jobsRunningOrPending;
    +	}
    +
    +	public List<JobID> getJobsFinished() {
    +		return jobsFinished;
    +	}
    +
    +	public List<JobID> getJobsCancelling() {
    +		return jobsCancelling;
    +	}
    +
    +	public List<JobID> getJobsCancelled() {
    +		return jobsCancelled;
    +	}
    +
    +	public List<JobID> getJobsFailing() {
    +		return jobsFailing;
    +	}
    +
    +	public List<JobID> getJobsFailed() {
    +		return jobsFailed;
    +	}
    +
    +	public List<JobID> getJobsRestarting() {
    +		return jobsRestarting;
    +	}
    +
    +	public List<JobID> getJobsSuspended() {
    +		return jobsSuspended;
    +	}
    +
    +	public List<JobID> getJobsReconciling() {
    +		return jobsReconciling;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +
    +	@Override
    +	public int hashCode() {
    +		return jobsCreated.hashCode() ^
    +				jobsRunningOrPending.hashCode() ^
    +				jobsFinished.hashCode() ^
    +				jobsCancelling.hashCode() ^
    +				jobsCancelled.hashCode() ^
    +				jobsFailing.hashCode() ^
    +				jobsFailed.hashCode() ^
    +				jobsRestarting.hashCode() ^
    +				jobsSuspended.hashCode() ^
    +				jobsReconciling.hashCode();
    +	}
    +
    +	@Override
    +	public boolean equals(Object obj) {
    +		if (obj == this) {
    +			return true;
    +		}
    +		else if (obj instanceof JobStatusesWithIdsOverview) {
    +			JobStatusesWithIdsOverview that = (JobStatusesWithIdsOverview) obj;
    +			return this.jobsCreated.equals(that.jobsCreated) &&
    +					this.jobsRunningOrPending.equals(that.jobsRunningOrPending) &&
    +					this.jobsFinished.equals(that.jobsFinished) &&
    +					this.jobsCancelling.equals(that.jobsCancelling) &&
    +					this.jobsCancelled.equals(that.jobsCancelled) &&
    +					this.jobsFailing.equals(that.jobsFailing) &&
    +					this.jobsFailed.equals(that.jobsFailed) &&
    +					this.jobsRestarting.equals(that.jobsRestarting) &&
    +					this.jobsSuspended.equals(that.jobsSuspended) &&
    +					this.jobsReconciling.equals(that.jobsReconciling);
    +		}
    +		else {
    +			return false;
    +		}
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "JobStatusesWithIdsOverview {" +
    +				"createdJobs=" + jobsCreated +
    +				", runningOrPendingJobs=" + jobsRunningOrPending +
    +				", finishedJobs=" + jobsFinished +
    +				", cancellingJobs=" + jobsCancelling +
    +				", cancelledJobs=" + jobsCancelled +
    +				", failingJobs=" + jobsFailing +
    +				", failedJobs=" + jobsFailed +
    +				", restartingJobs=" + jobsRestarting +
    +				", suspendedJobs=" + jobsSuspended +
    +				", reconcilingJobs=" + jobsReconciling +
    +				'}';
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Message serializers
    +	// ------------------------------------------------------------------------
    +
    +	public static final class JobStatusesWithIdsOverviewSerializer extends StdSerializer<JobStatusesWithIdsOverview> {
    --- End diff --
    
    I think it was due to the serialization of `JobID`s. Maybe what should be enough is to define a (de)serializer for the `JobID` class?


---

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4734#discussion_r141575013
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java ---
    @@ -68,36 +84,9 @@ public CurrentJobIdsHandler(Executor executor, Time timeout) {
     
     						StringWriter writer = new StringWriter();
     						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
    -
    -						gen.writeStartObject();
    -
    -						gen.writeArrayFieldStart("jobs-running");
    -						for (JobID jid : overview.getJobsRunningOrPending()) {
    -							gen.writeString(jid.toString());
    -						}
    -						gen.writeEndArray();
    -
    -						gen.writeArrayFieldStart("jobs-finished");
    -						for (JobID jid : overview.getJobsFinished()) {
    -							gen.writeString(jid.toString());
    -						}
    -						gen.writeEndArray();
    -
    -						gen.writeArrayFieldStart("jobs-cancelled");
    -						for (JobID jid : overview.getJobsCancelled()) {
    -							gen.writeString(jid.toString());
    -						}
    -						gen.writeEndArray();
    -
    -						gen.writeArrayFieldStart("jobs-failed");
    -						for (JobID jid : overview.getJobsFailed()) {
    -							gen.writeString(jid.toString());
    -						}
    -						gen.writeEndArray();
    -
    -						gen.writeEndObject();
    -
    +						new JobsWithIDsOverview.JobsWithIDsOverviewSerializer().serialize(overview, gen, null);
    --- End diff --
    
    we could store the serializer in a field.


---

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4734#discussion_r149057297
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobStatusesWithIdsOverview.java ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.messages.webmonitor;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +
    +import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * An overview of how many jobs are in which status.
    + */
    +public class JobStatusesWithIdsOverview implements ResponseBody, InfoMessage {
    +
    +	private static final long serialVersionUID = -3699051943490133183L;
    +
    +	public static final String FIELD_NAME_JOBS_CREATED_IDS = "jobs-created";
    +	public static final String FIELD_NAME_JOBS_RUNNING_IDS = "jobs-running";
    +	public static final String FIELD_NAME_JOBS_FINISHED_IDS = "jobs-finished";
    +	public static final String FIELD_NAME_JOBS_CANCELLING_IDS = "jobs-cancelling";
    +	public static final String FIELD_NAME_JOBS_CANCELLED_IDS = "jobs-cancelled";
    +	public static final String FIELD_NAME_JOBS_FAILING_IDS = "jobs-failing";
    +	public static final String FIELD_NAME_JOBS_FAILED_IDS = "jobs-failed";
    +	public static final String FIELD_NAME_JOBS_RESTARTING_IDS = "jobs-restarting";
    +	public static final String FIELD_NAME_JOBS_SUSPENDED_IDS = "jobs-suspended";
    +	public static final String FIELD_NAME_JOBS_RECONCILING_IDS = "jobs-reconciling";
    +
    +	@JsonProperty(FIELD_NAME_JOBS_CREATED_IDS)
    +	@JsonSerialize(contentUsing = JobIDSerializer.class)
    +	private final List<JobID> jobsCreated;
    +
    +	@JsonProperty(FIELD_NAME_JOBS_RUNNING_IDS)
    +	@JsonSerialize(contentUsing = JobIDSerializer.class)
    +	private final List<JobID> jobsRunningOrPending;
    +
    +	@JsonProperty(FIELD_NAME_JOBS_FINISHED_IDS)
    +	@JsonSerialize(contentUsing = JobIDSerializer.class)
    +	private final List<JobID> jobsFinished;
    +
    +	@JsonProperty(FIELD_NAME_JOBS_CANCELLING_IDS)
    +	@JsonSerialize(contentUsing = JobIDSerializer.class)
    +	private final List<JobID> jobsCancelling;
    +
    +	@JsonProperty(FIELD_NAME_JOBS_CANCELLED_IDS)
    +	@JsonSerialize(contentUsing = JobIDSerializer.class)
    +	private final List<JobID> jobsCancelled;
    +
    +	@JsonProperty(FIELD_NAME_JOBS_FAILING_IDS)
    +	@JsonSerialize(contentUsing = JobIDSerializer.class)
    +	private final List<JobID> jobsFailing;
    +
    +	@JsonProperty(FIELD_NAME_JOBS_FAILED_IDS)
    +	@JsonSerialize(contentUsing = JobIDSerializer.class)
    +	private final List<JobID> jobsFailed;
    +
    +	@JsonProperty(FIELD_NAME_JOBS_RESTARTING_IDS)
    +	@JsonSerialize(contentUsing = JobIDSerializer.class)
    +	private final List<JobID> jobsRestarting;
    +
    +	@JsonProperty(FIELD_NAME_JOBS_SUSPENDED_IDS)
    +	@JsonSerialize(contentUsing = JobIDSerializer.class)
    +	private final List<JobID> jobsSuspended;
    +
    +	@JsonProperty(FIELD_NAME_JOBS_RECONCILING_IDS)
    +	@JsonSerialize(contentUsing = JobIDSerializer.class)
    +	private final List<JobID> jobsReconciling;
    --- End diff --
    
    I would actually not group the jobs here. Rather we could return a collection of `JobIDs` with `JobStatus`. If you need to present a grouped overview, then the client has to calculate it. That would simplify this class and the handler considerably. What do you think?


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    Hi @tillrohrmann, just to double check, so does the conclusion mean that #4805 subsumes this PR, and this one can be closed? 


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    I think @zentol is right and we can register the `CurrentJobsOverviewHandler` under `jobs/overview`. Thus, we should add this handler as well. Could you rebase this handler onto the latest master such that we can merge it?


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    I just talked to @zentol and we both agreed that we should register the `CurrentJobsOverviewHandler` under `jobs/overview`. Moreover, we should change the `CurrentJobIdsHandler` such that it simply returns all available job ids as a simple list. Maybe we could rename it into `JobIdsHandler` or `JobsHandler`.


---

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4734#discussion_r143973267
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -242,6 +248,86 @@ public void start() throws Exception {
     	}
     
     	@Override
    +	public CompletableFuture<JobStatusesWithIdsOverview> requestJobIdsOverview(@RpcTimeout Time timeout) {
    --- End diff --
    
    I think that makes a lot of sense. Will address this.


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    Hi @tillrohrmann, this PR is now rebased to the latest master, and reworked to incorporate your last comments.


---

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4734#discussion_r141580505
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java ---
    @@ -68,36 +84,9 @@ public CurrentJobIdsHandler(Executor executor, Time timeout) {
     
     						StringWriter writer = new StringWriter();
     						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
    -
    -						gen.writeStartObject();
    -
    -						gen.writeArrayFieldStart("jobs-running");
    -						for (JobID jid : overview.getJobsRunningOrPending()) {
    -							gen.writeString(jid.toString());
    -						}
    -						gen.writeEndArray();
    -
    -						gen.writeArrayFieldStart("jobs-finished");
    -						for (JobID jid : overview.getJobsFinished()) {
    -							gen.writeString(jid.toString());
    -						}
    -						gen.writeEndArray();
    -
    -						gen.writeArrayFieldStart("jobs-cancelled");
    -						for (JobID jid : overview.getJobsCancelled()) {
    -							gen.writeString(jid.toString());
    -						}
    -						gen.writeEndArray();
    -
    -						gen.writeArrayFieldStart("jobs-failed");
    -						for (JobID jid : overview.getJobsFailed()) {
    -							gen.writeString(jid.toString());
    -						}
    -						gen.writeEndArray();
    -
    -						gen.writeEndObject();
    -
    +						new JobsWithIDsOverview.JobsWithIDsOverviewSerializer().serialize(overview, gen, null);
    --- End diff --
    
    Will change.


---

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4734#discussion_r143718709
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobStatusesWithIdsOverview.java ---
    @@ -0,0 +1,368 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.messages.webmonitor;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import com.fasterxml.jackson.core.JsonParser;
    +import com.fasterxml.jackson.databind.DeserializationContext;
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.SerializerProvider;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
    +import com.fasterxml.jackson.databind.ser.std.StdSerializer;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * An overview of how many jobs are in which status.
    + */
    +@JsonSerialize(using = JobStatusesWithIdsOverview.JobStatusesWithIdsOverviewSerializer.class)
    +@JsonDeserialize(using = JobStatusesWithIdsOverview.JobStatusesWithIdsOverviewDeserializer.class)
    +public class JobStatusesWithIdsOverview implements ResponseBody, InfoMessage {
    +
    +	private static final long serialVersionUID = -3699051943490133183L;
    +
    +	public static final String FIELD_NAME_JOBS_CREATED_IDS = "jobs-created";
    +	public static final String FIELD_NAME_JOBS_RUNNING_IDS = "jobs-running";
    +	public static final String FIELD_NAME_JOBS_FINISHED_IDS = "jobs-finished";
    +	public static final String FIELD_NAME_JOBS_CANCELLING_IDS = "jobs-cancelling";
    +	public static final String FIELD_NAME_JOBS_CANCELLED_IDS = "jobs-cancelled";
    +	public static final String FIELD_NAME_JOBS_FAILING_IDS = "jobs-failing";
    +	public static final String FIELD_NAME_JOBS_FAILED_IDS = "jobs-failed";
    +	public static final String FIELD_NAME_JOBS_RESTARTING_IDS = "jobs-restarting";
    +	public static final String FIELD_NAME_JOBS_SUSPENDED_IDS = "jobs-suspended";
    +	public static final String FIELD_NAME_JOBS_RECONCILING_IDS = "jobs-reconciling";
    +
    +	private final List<JobID> jobsCreated;
    +	private final List<JobID> jobsRunningOrPending;
    +	private final List<JobID> jobsFinished;
    +	private final List<JobID> jobsCancelling;
    +	private final List<JobID> jobsCancelled;
    +	private final List<JobID> jobsFailing;
    +	private final List<JobID> jobsFailed;
    +	private final List<JobID> jobsRestarting;
    +	private final List<JobID> jobsSuspended;
    +	private final List<JobID> jobsReconciling;
    +
    +	public JobStatusesWithIdsOverview(
    +			List<JobID> jobsCreated,
    +			List<JobID> jobsRunningOrPending,
    +			List<JobID> jobsFinished,
    +			List<JobID> jobsCancelling,
    +			List<JobID> jobsCancelled,
    +			List<JobID> jobsFailing,
    +			List<JobID> jobsFailed,
    +			List<JobID> jobsRestarting,
    +			List<JobID> jobsSuspended,
    +			List<JobID> jobsReconciling) {
    +
    +		this.jobsCreated = checkNotNull(jobsCreated);
    +		this.jobsRunningOrPending = checkNotNull(jobsRunningOrPending);
    +		this.jobsFinished = checkNotNull(jobsFinished);
    +		this.jobsCancelling = checkNotNull(jobsCancelling);
    +		this.jobsCancelled = checkNotNull(jobsCancelled);
    +		this.jobsFailing = checkNotNull(jobsFailing);
    +		this.jobsFailed = checkNotNull(jobsFailed);
    +		this.jobsRestarting = checkNotNull(jobsRestarting);
    +		this.jobsSuspended = checkNotNull(jobsSuspended);
    +		this.jobsReconciling = checkNotNull(jobsReconciling);
    +	}
    +
    +	public JobStatusesWithIdsOverview(JobStatusesWithIdsOverview first, JobStatusesWithIdsOverview second) {
    +		this.jobsCreated = combine(first.getJobsCreated(), second.getJobsCreated());
    +		this.jobsRunningOrPending = combine(first.getJobsRunningOrPending(), second.getJobsRunningOrPending());
    +		this.jobsFinished = combine(first.getJobsFinished(), second.getJobsFinished());
    +		this.jobsCancelling = combine(first.getJobsCancelling(), second.getJobsCancelling());
    +		this.jobsCancelled = combine(first.getJobsCancelled(), second.getJobsCancelled());
    +		this.jobsFailing = combine(first.getJobsFailing(), second.getJobsFailing());
    +		this.jobsFailed = combine(first.getJobsFailed(), second.getJobsFailed());
    +		this.jobsRestarting = combine(first.getJobsRestarting(), second.getJobsRestarting());
    +		this.jobsSuspended = combine(first.getJobsSuspended(), second.getJobsSuspended());
    +		this.jobsReconciling = combine(first.getJobsReconciling(), second.getJobsReconciling());
    +	}
    +
    +	public List<JobID> getJobsCreated() {
    +		return jobsCreated;
    +	}
    +
    +	public List<JobID> getJobsRunningOrPending() {
    +		return jobsRunningOrPending;
    +	}
    +
    +	public List<JobID> getJobsFinished() {
    +		return jobsFinished;
    +	}
    +
    +	public List<JobID> getJobsCancelling() {
    +		return jobsCancelling;
    +	}
    +
    +	public List<JobID> getJobsCancelled() {
    +		return jobsCancelled;
    +	}
    +
    +	public List<JobID> getJobsFailing() {
    +		return jobsFailing;
    +	}
    +
    +	public List<JobID> getJobsFailed() {
    +		return jobsFailed;
    +	}
    +
    +	public List<JobID> getJobsRestarting() {
    +		return jobsRestarting;
    +	}
    +
    +	public List<JobID> getJobsSuspended() {
    +		return jobsSuspended;
    +	}
    +
    +	public List<JobID> getJobsReconciling() {
    +		return jobsReconciling;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +
    +	@Override
    +	public int hashCode() {
    +		return jobsCreated.hashCode() ^
    +				jobsRunningOrPending.hashCode() ^
    +				jobsFinished.hashCode() ^
    +				jobsCancelling.hashCode() ^
    +				jobsCancelled.hashCode() ^
    +				jobsFailing.hashCode() ^
    +				jobsFailed.hashCode() ^
    +				jobsRestarting.hashCode() ^
    +				jobsSuspended.hashCode() ^
    +				jobsReconciling.hashCode();
    +	}
    +
    +	@Override
    +	public boolean equals(Object obj) {
    +		if (obj == this) {
    +			return true;
    +		}
    +		else if (obj instanceof JobStatusesWithIdsOverview) {
    +			JobStatusesWithIdsOverview that = (JobStatusesWithIdsOverview) obj;
    +			return this.jobsCreated.equals(that.jobsCreated) &&
    +					this.jobsRunningOrPending.equals(that.jobsRunningOrPending) &&
    +					this.jobsFinished.equals(that.jobsFinished) &&
    +					this.jobsCancelling.equals(that.jobsCancelling) &&
    +					this.jobsCancelled.equals(that.jobsCancelled) &&
    +					this.jobsFailing.equals(that.jobsFailing) &&
    +					this.jobsFailed.equals(that.jobsFailed) &&
    +					this.jobsRestarting.equals(that.jobsRestarting) &&
    +					this.jobsSuspended.equals(that.jobsSuspended) &&
    +					this.jobsReconciling.equals(that.jobsReconciling);
    +		}
    +		else {
    +			return false;
    +		}
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "JobStatusesWithIdsOverview {" +
    +				"createdJobs=" + jobsCreated +
    +				", runningOrPendingJobs=" + jobsRunningOrPending +
    +				", finishedJobs=" + jobsFinished +
    +				", cancellingJobs=" + jobsCancelling +
    +				", cancelledJobs=" + jobsCancelled +
    +				", failingJobs=" + jobsFailing +
    +				", failedJobs=" + jobsFailed +
    +				", restartingJobs=" + jobsRestarting +
    +				", suspendedJobs=" + jobsSuspended +
    +				", reconcilingJobs=" + jobsReconciling +
    +				'}';
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Message serializers
    +	// ------------------------------------------------------------------------
    +
    +	public static final class JobStatusesWithIdsOverviewSerializer extends StdSerializer<JobStatusesWithIdsOverview> {
    +
    +		private static final long serialVersionUID = 1009738627775768331L;
    +
    +		public JobStatusesWithIdsOverviewSerializer() {
    +			super(JobStatusesWithIdsOverview.class);
    +		}
    +
    +		@Override
    +		public void serialize(
    +				JobStatusesWithIdsOverview jobStatusesWithIdsOverview,
    +				JsonGenerator jsonGenerator,
    +				SerializerProvider serializerProvider) throws IOException {
    +
    +			jsonGenerator.writeStartObject();
    +
    +			jsonGenerator.writeArrayFieldStart(FIELD_NAME_JOBS_CREATED_IDS);
    +			for (JobID jid : jobStatusesWithIdsOverview.getJobsCreated()) {
    +				jsonGenerator.writeString(jid.toString());
    +			}
    +			jsonGenerator.writeEndArray();
    +
    +			jsonGenerator.writeArrayFieldStart(FIELD_NAME_JOBS_RUNNING_IDS);
    +			for (JobID jid : jobStatusesWithIdsOverview.getJobsRunningOrPending()) {
    +				jsonGenerator.writeString(jid.toString());
    +			}
    +			jsonGenerator.writeEndArray();
    +
    +			jsonGenerator.writeArrayFieldStart(FIELD_NAME_JOBS_FINISHED_IDS);
    +			for (JobID jid : jobStatusesWithIdsOverview.getJobsFinished()) {
    +				jsonGenerator.writeString(jid.toString());
    +			}
    +			jsonGenerator.writeEndArray();
    +
    +			jsonGenerator.writeArrayFieldStart(FIELD_NAME_JOBS_CANCELLING_IDS);
    +			for (JobID jid : jobStatusesWithIdsOverview.getJobsCancelling()) {
    +				jsonGenerator.writeString(jid.toString());
    +			}
    +			jsonGenerator.writeEndArray();
    +
    +			jsonGenerator.writeArrayFieldStart(FIELD_NAME_JOBS_CANCELLED_IDS);
    +			for (JobID jid : jobStatusesWithIdsOverview.getJobsCancelled()) {
    +				jsonGenerator.writeString(jid.toString());
    +			}
    +			jsonGenerator.writeEndArray();
    +
    +			jsonGenerator.writeArrayFieldStart(FIELD_NAME_JOBS_FAILING_IDS);
    +			for (JobID jid : jobStatusesWithIdsOverview.getJobsFailing()) {
    +				jsonGenerator.writeString(jid.toString());
    +			}
    +			jsonGenerator.writeEndArray();
    +
    +			jsonGenerator.writeArrayFieldStart(FIELD_NAME_JOBS_FAILED_IDS);
    +			for (JobID jid : jobStatusesWithIdsOverview.getJobsFailed()) {
    +				jsonGenerator.writeString(jid.toString());
    +			}
    +			jsonGenerator.writeEndArray();
    +
    +			jsonGenerator.writeArrayFieldStart(FIELD_NAME_JOBS_RESTARTING_IDS);
    +			for (JobID jid : jobStatusesWithIdsOverview.getJobsRestarting()) {
    +				jsonGenerator.writeString(jid.toString());
    +			}
    +			jsonGenerator.writeEndArray();
    +
    +			jsonGenerator.writeArrayFieldStart(FIELD_NAME_JOBS_SUSPENDED_IDS);
    +			for (JobID jid : jobStatusesWithIdsOverview.getJobsSuspended()) {
    +				jsonGenerator.writeString(jid.toString());
    +			}
    +			jsonGenerator.writeEndArray();
    +
    +			jsonGenerator.writeArrayFieldStart(FIELD_NAME_JOBS_RECONCILING_IDS);
    +			for (JobID jid : jobStatusesWithIdsOverview.getJobsReconciling()) {
    +				jsonGenerator.writeString(jid.toString());
    +			}
    +			jsonGenerator.writeEndArray();
    +
    +			jsonGenerator.writeEndObject();
    +		}
    +	}
    +
    +	public static final class JobStatusesWithIdsOverviewDeserializer extends StdDeserializer<JobStatusesWithIdsOverview> {
    +
    +		private static final long serialVersionUID = 4733739296433630723L;
    +
    +		public JobStatusesWithIdsOverviewDeserializer() {
    +			super(JobStatusesWithIdsOverview.class);
    +		}
    +
    +		@Override
    +		public JobStatusesWithIdsOverview deserialize(
    +				JsonParser jsonParser,
    +				DeserializationContext deserializationContext) throws IOException {
    +
    +			JsonNode rootNode = jsonParser.readValueAsTree();
    +
    +			List<JobID> jobsCreated = new LinkedList<>();
    --- End diff --
    
    I think we should rather use `ArrayList` instead of `LinkedList`.


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    Actually, I'd like to throw out the entire handler and replace it with the `CurrentJobsOverviewHandler`. The CurrentJobIdshandler handler is _not_ used by the web UI as the listings of jobs in the UI go through `/joboverview` (that i would prefer to go through `/jobs`).


---

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4734#discussion_r143998745
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobStatusesWithIdsOverview.java ---
    @@ -0,0 +1,368 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.messages.webmonitor;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import com.fasterxml.jackson.core.JsonParser;
    +import com.fasterxml.jackson.databind.DeserializationContext;
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.SerializerProvider;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
    +import com.fasterxml.jackson.databind.ser.std.StdSerializer;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * An overview of how many jobs are in which status.
    + */
    +@JsonSerialize(using = JobStatusesWithIdsOverview.JobStatusesWithIdsOverviewSerializer.class)
    +@JsonDeserialize(using = JobStatusesWithIdsOverview.JobStatusesWithIdsOverviewDeserializer.class)
    +public class JobStatusesWithIdsOverview implements ResponseBody, InfoMessage {
    +
    +	private static final long serialVersionUID = -3699051943490133183L;
    +
    +	public static final String FIELD_NAME_JOBS_CREATED_IDS = "jobs-created";
    +	public static final String FIELD_NAME_JOBS_RUNNING_IDS = "jobs-running";
    +	public static final String FIELD_NAME_JOBS_FINISHED_IDS = "jobs-finished";
    +	public static final String FIELD_NAME_JOBS_CANCELLING_IDS = "jobs-cancelling";
    +	public static final String FIELD_NAME_JOBS_CANCELLED_IDS = "jobs-cancelled";
    +	public static final String FIELD_NAME_JOBS_FAILING_IDS = "jobs-failing";
    +	public static final String FIELD_NAME_JOBS_FAILED_IDS = "jobs-failed";
    +	public static final String FIELD_NAME_JOBS_RESTARTING_IDS = "jobs-restarting";
    +	public static final String FIELD_NAME_JOBS_SUSPENDED_IDS = "jobs-suspended";
    +	public static final String FIELD_NAME_JOBS_RECONCILING_IDS = "jobs-reconciling";
    +
    +	private final List<JobID> jobsCreated;
    +	private final List<JobID> jobsRunningOrPending;
    +	private final List<JobID> jobsFinished;
    +	private final List<JobID> jobsCancelling;
    +	private final List<JobID> jobsCancelled;
    +	private final List<JobID> jobsFailing;
    +	private final List<JobID> jobsFailed;
    +	private final List<JobID> jobsRestarting;
    +	private final List<JobID> jobsSuspended;
    +	private final List<JobID> jobsReconciling;
    +
    +	public JobStatusesWithIdsOverview(
    +			List<JobID> jobsCreated,
    +			List<JobID> jobsRunningOrPending,
    +			List<JobID> jobsFinished,
    +			List<JobID> jobsCancelling,
    +			List<JobID> jobsCancelled,
    +			List<JobID> jobsFailing,
    +			List<JobID> jobsFailed,
    +			List<JobID> jobsRestarting,
    +			List<JobID> jobsSuspended,
    +			List<JobID> jobsReconciling) {
    +
    +		this.jobsCreated = checkNotNull(jobsCreated);
    +		this.jobsRunningOrPending = checkNotNull(jobsRunningOrPending);
    +		this.jobsFinished = checkNotNull(jobsFinished);
    +		this.jobsCancelling = checkNotNull(jobsCancelling);
    +		this.jobsCancelled = checkNotNull(jobsCancelled);
    +		this.jobsFailing = checkNotNull(jobsFailing);
    +		this.jobsFailed = checkNotNull(jobsFailed);
    +		this.jobsRestarting = checkNotNull(jobsRestarting);
    +		this.jobsSuspended = checkNotNull(jobsSuspended);
    +		this.jobsReconciling = checkNotNull(jobsReconciling);
    +	}
    +
    +	public JobStatusesWithIdsOverview(JobStatusesWithIdsOverview first, JobStatusesWithIdsOverview second) {
    +		this.jobsCreated = combine(first.getJobsCreated(), second.getJobsCreated());
    +		this.jobsRunningOrPending = combine(first.getJobsRunningOrPending(), second.getJobsRunningOrPending());
    +		this.jobsFinished = combine(first.getJobsFinished(), second.getJobsFinished());
    +		this.jobsCancelling = combine(first.getJobsCancelling(), second.getJobsCancelling());
    +		this.jobsCancelled = combine(first.getJobsCancelled(), second.getJobsCancelled());
    +		this.jobsFailing = combine(first.getJobsFailing(), second.getJobsFailing());
    +		this.jobsFailed = combine(first.getJobsFailed(), second.getJobsFailed());
    +		this.jobsRestarting = combine(first.getJobsRestarting(), second.getJobsRestarting());
    +		this.jobsSuspended = combine(first.getJobsSuspended(), second.getJobsSuspended());
    +		this.jobsReconciling = combine(first.getJobsReconciling(), second.getJobsReconciling());
    +	}
    +
    +	public List<JobID> getJobsCreated() {
    +		return jobsCreated;
    +	}
    +
    +	public List<JobID> getJobsRunningOrPending() {
    +		return jobsRunningOrPending;
    +	}
    +
    +	public List<JobID> getJobsFinished() {
    +		return jobsFinished;
    +	}
    +
    +	public List<JobID> getJobsCancelling() {
    +		return jobsCancelling;
    +	}
    +
    +	public List<JobID> getJobsCancelled() {
    +		return jobsCancelled;
    +	}
    +
    +	public List<JobID> getJobsFailing() {
    +		return jobsFailing;
    +	}
    +
    +	public List<JobID> getJobsFailed() {
    +		return jobsFailed;
    +	}
    +
    +	public List<JobID> getJobsRestarting() {
    +		return jobsRestarting;
    +	}
    +
    +	public List<JobID> getJobsSuspended() {
    +		return jobsSuspended;
    +	}
    +
    +	public List<JobID> getJobsReconciling() {
    +		return jobsReconciling;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +
    +	@Override
    +	public int hashCode() {
    +		return jobsCreated.hashCode() ^
    +				jobsRunningOrPending.hashCode() ^
    +				jobsFinished.hashCode() ^
    +				jobsCancelling.hashCode() ^
    +				jobsCancelled.hashCode() ^
    +				jobsFailing.hashCode() ^
    +				jobsFailed.hashCode() ^
    +				jobsRestarting.hashCode() ^
    +				jobsSuspended.hashCode() ^
    +				jobsReconciling.hashCode();
    +	}
    +
    +	@Override
    +	public boolean equals(Object obj) {
    +		if (obj == this) {
    +			return true;
    +		}
    +		else if (obj instanceof JobStatusesWithIdsOverview) {
    +			JobStatusesWithIdsOverview that = (JobStatusesWithIdsOverview) obj;
    +			return this.jobsCreated.equals(that.jobsCreated) &&
    +					this.jobsRunningOrPending.equals(that.jobsRunningOrPending) &&
    +					this.jobsFinished.equals(that.jobsFinished) &&
    +					this.jobsCancelling.equals(that.jobsCancelling) &&
    +					this.jobsCancelled.equals(that.jobsCancelled) &&
    +					this.jobsFailing.equals(that.jobsFailing) &&
    +					this.jobsFailed.equals(that.jobsFailed) &&
    +					this.jobsRestarting.equals(that.jobsRestarting) &&
    +					this.jobsSuspended.equals(that.jobsSuspended) &&
    +					this.jobsReconciling.equals(that.jobsReconciling);
    +		}
    +		else {
    +			return false;
    +		}
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "JobStatusesWithIdsOverview {" +
    +				"createdJobs=" + jobsCreated +
    +				", runningOrPendingJobs=" + jobsRunningOrPending +
    +				", finishedJobs=" + jobsFinished +
    +				", cancellingJobs=" + jobsCancelling +
    +				", cancelledJobs=" + jobsCancelled +
    +				", failingJobs=" + jobsFailing +
    +				", failedJobs=" + jobsFailed +
    +				", restartingJobs=" + jobsRestarting +
    +				", suspendedJobs=" + jobsSuspended +
    +				", reconcilingJobs=" + jobsReconciling +
    +				'}';
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Message serializers
    +	// ------------------------------------------------------------------------
    +
    +	public static final class JobStatusesWithIdsOverviewSerializer extends StdSerializer<JobStatusesWithIdsOverview> {
    --- End diff --
    
    👍 will change


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    Yes, I'll rebase this now.


---

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4734#discussion_r143716493
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -242,6 +248,86 @@ public void start() throws Exception {
     	}
     
     	@Override
    +	public CompletableFuture<JobStatusesWithIdsOverview> requestJobIdsOverview(@RpcTimeout Time timeout) {
    +		final int numJobs = jobManagerRunners.size();
    +
    +		ArrayList<CompletableFuture<Tuple2<JobID, JobStatus>>> jobStatuses = new ArrayList<>(numJobs);
    +		for (Map.Entry<JobID, JobManagerRunner> jobManagerRunnerEntry : jobManagerRunners.entrySet()) {
    +			CompletableFuture<JobStatus> jobStatusFuture =
    +				jobManagerRunnerEntry.getValue().getJobManagerGateway().requestJobStatus(timeout);
    +
    +			jobStatuses.add(jobStatusFuture.thenApply(jobStatus -> Tuple2.of(jobManagerRunnerEntry.getKey(), jobStatus)));
    +		}
    +
    +		CompletableFuture<Collection<Tuple2<JobID, JobStatus>>> combinedJobStatusesFuture = FutureUtils.combineAll(jobStatuses);
    +
    +		return combinedJobStatusesFuture.thenApply(
    +			jobStatusesWithIds -> {
    +				List<JobID> jobsCreated = new LinkedList<>();
    +				List<JobID> jobsRunning = new LinkedList<>();
    +				List<JobID> jobsFinished = new LinkedList<>();
    +				List<JobID> jobsCancelling = new LinkedList<>();
    +				List<JobID> jobsCancelled = new LinkedList<>();
    +				List<JobID> jobsFailing = new LinkedList<>();
    +				List<JobID> jobsFailed = new LinkedList<>();
    +				List<JobID> jobsRestarting = new LinkedList<>();
    +				List<JobID> jobsSuspended = new LinkedList<>();
    +				List<JobID> jobsReconciling = new LinkedList<>();
    --- End diff --
    
    Not entirely sure, but I think that `ArrayList` would be better here. Even though we will most likely only add few elements per list, a linked list is probably a bit slower since `LinkedList` uses some more complex internal data structures.


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    @tillrohrmann @zentol alright, thanks for the review and inputs. I'll address comments and change the `CurrentJobIdsHandler` to return all available Job IDs as a simple list as Till suggested.


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    The additional job statuses are only new JSON fields. Not sure how the web UI handles them, but shouldn't the change be non-breaking?
    
    We could also revert b60466c for now if that will make things easier.


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    @tillrohrmann yes, reusing `DispatcherGateway#requestJobDetails` was an option that did occur to me but was also not sure of the redundant cost. But sure, we can avoid adding yet another RPC for now.
    
    Thanks for the branch link. I'll rebase onto that.


---

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4734


---

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4734#discussion_r143973446
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -242,6 +248,86 @@ public void start() throws Exception {
     	}
     
     	@Override
    +	public CompletableFuture<JobStatusesWithIdsOverview> requestJobIdsOverview(@RpcTimeout Time timeout) {
    +		final int numJobs = jobManagerRunners.size();
    +
    +		ArrayList<CompletableFuture<Tuple2<JobID, JobStatus>>> jobStatuses = new ArrayList<>(numJobs);
    +		for (Map.Entry<JobID, JobManagerRunner> jobManagerRunnerEntry : jobManagerRunners.entrySet()) {
    +			CompletableFuture<JobStatus> jobStatusFuture =
    +				jobManagerRunnerEntry.getValue().getJobManagerGateway().requestJobStatus(timeout);
    +
    +			jobStatuses.add(jobStatusFuture.thenApply(jobStatus -> Tuple2.of(jobManagerRunnerEntry.getKey(), jobStatus)));
    +		}
    +
    +		CompletableFuture<Collection<Tuple2<JobID, JobStatus>>> combinedJobStatusesFuture = FutureUtils.combineAll(jobStatuses);
    +
    +		return combinedJobStatusesFuture.thenApply(
    +			jobStatusesWithIds -> {
    +				List<JobID> jobsCreated = new LinkedList<>();
    +				List<JobID> jobsRunning = new LinkedList<>();
    +				List<JobID> jobsFinished = new LinkedList<>();
    +				List<JobID> jobsCancelling = new LinkedList<>();
    +				List<JobID> jobsCancelled = new LinkedList<>();
    +				List<JobID> jobsFailing = new LinkedList<>();
    +				List<JobID> jobsFailed = new LinkedList<>();
    +				List<JobID> jobsRestarting = new LinkedList<>();
    +				List<JobID> jobsSuspended = new LinkedList<>();
    +				List<JobID> jobsReconciling = new LinkedList<>();
    --- End diff --
    
    I think we might need some micro benchmarking here to be sure, as it may depend on the actual list to return. But I could stay with `ArrayList` (I think that was what we were using anyway.)


---

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4734#discussion_r143996677
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobStatusesWithIdsOverview.java ---
    @@ -0,0 +1,368 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.messages.webmonitor;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import com.fasterxml.jackson.core.JsonParser;
    +import com.fasterxml.jackson.databind.DeserializationContext;
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.SerializerProvider;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
    +import com.fasterxml.jackson.databind.ser.std.StdSerializer;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * An overview of how many jobs are in which status.
    + */
    +@JsonSerialize(using = JobStatusesWithIdsOverview.JobStatusesWithIdsOverviewSerializer.class)
    +@JsonDeserialize(using = JobStatusesWithIdsOverview.JobStatusesWithIdsOverviewDeserializer.class)
    +public class JobStatusesWithIdsOverview implements ResponseBody, InfoMessage {
    +
    +	private static final long serialVersionUID = -3699051943490133183L;
    +
    +	public static final String FIELD_NAME_JOBS_CREATED_IDS = "jobs-created";
    +	public static final String FIELD_NAME_JOBS_RUNNING_IDS = "jobs-running";
    +	public static final String FIELD_NAME_JOBS_FINISHED_IDS = "jobs-finished";
    +	public static final String FIELD_NAME_JOBS_CANCELLING_IDS = "jobs-cancelling";
    +	public static final String FIELD_NAME_JOBS_CANCELLED_IDS = "jobs-cancelled";
    +	public static final String FIELD_NAME_JOBS_FAILING_IDS = "jobs-failing";
    +	public static final String FIELD_NAME_JOBS_FAILED_IDS = "jobs-failed";
    +	public static final String FIELD_NAME_JOBS_RESTARTING_IDS = "jobs-restarting";
    +	public static final String FIELD_NAME_JOBS_SUSPENDED_IDS = "jobs-suspended";
    +	public static final String FIELD_NAME_JOBS_RECONCILING_IDS = "jobs-reconciling";
    +
    +	private final List<JobID> jobsCreated;
    +	private final List<JobID> jobsRunningOrPending;
    +	private final List<JobID> jobsFinished;
    +	private final List<JobID> jobsCancelling;
    +	private final List<JobID> jobsCancelled;
    +	private final List<JobID> jobsFailing;
    +	private final List<JobID> jobsFailed;
    +	private final List<JobID> jobsRestarting;
    +	private final List<JobID> jobsSuspended;
    +	private final List<JobID> jobsReconciling;
    +
    +	public JobStatusesWithIdsOverview(
    +			List<JobID> jobsCreated,
    +			List<JobID> jobsRunningOrPending,
    +			List<JobID> jobsFinished,
    +			List<JobID> jobsCancelling,
    +			List<JobID> jobsCancelled,
    +			List<JobID> jobsFailing,
    +			List<JobID> jobsFailed,
    +			List<JobID> jobsRestarting,
    +			List<JobID> jobsSuspended,
    +			List<JobID> jobsReconciling) {
    +
    +		this.jobsCreated = checkNotNull(jobsCreated);
    +		this.jobsRunningOrPending = checkNotNull(jobsRunningOrPending);
    +		this.jobsFinished = checkNotNull(jobsFinished);
    +		this.jobsCancelling = checkNotNull(jobsCancelling);
    +		this.jobsCancelled = checkNotNull(jobsCancelled);
    +		this.jobsFailing = checkNotNull(jobsFailing);
    +		this.jobsFailed = checkNotNull(jobsFailed);
    +		this.jobsRestarting = checkNotNull(jobsRestarting);
    +		this.jobsSuspended = checkNotNull(jobsSuspended);
    +		this.jobsReconciling = checkNotNull(jobsReconciling);
    +	}
    +
    +	public JobStatusesWithIdsOverview(JobStatusesWithIdsOverview first, JobStatusesWithIdsOverview second) {
    +		this.jobsCreated = combine(first.getJobsCreated(), second.getJobsCreated());
    +		this.jobsRunningOrPending = combine(first.getJobsRunningOrPending(), second.getJobsRunningOrPending());
    +		this.jobsFinished = combine(first.getJobsFinished(), second.getJobsFinished());
    +		this.jobsCancelling = combine(first.getJobsCancelling(), second.getJobsCancelling());
    +		this.jobsCancelled = combine(first.getJobsCancelled(), second.getJobsCancelled());
    +		this.jobsFailing = combine(first.getJobsFailing(), second.getJobsFailing());
    +		this.jobsFailed = combine(first.getJobsFailed(), second.getJobsFailed());
    +		this.jobsRestarting = combine(first.getJobsRestarting(), second.getJobsRestarting());
    +		this.jobsSuspended = combine(first.getJobsSuspended(), second.getJobsSuspended());
    +		this.jobsReconciling = combine(first.getJobsReconciling(), second.getJobsReconciling());
    +	}
    +
    +	public List<JobID> getJobsCreated() {
    +		return jobsCreated;
    +	}
    +
    +	public List<JobID> getJobsRunningOrPending() {
    +		return jobsRunningOrPending;
    +	}
    +
    +	public List<JobID> getJobsFinished() {
    +		return jobsFinished;
    +	}
    +
    +	public List<JobID> getJobsCancelling() {
    +		return jobsCancelling;
    +	}
    +
    +	public List<JobID> getJobsCancelled() {
    +		return jobsCancelled;
    +	}
    +
    +	public List<JobID> getJobsFailing() {
    +		return jobsFailing;
    +	}
    +
    +	public List<JobID> getJobsFailed() {
    +		return jobsFailed;
    +	}
    +
    +	public List<JobID> getJobsRestarting() {
    +		return jobsRestarting;
    +	}
    +
    +	public List<JobID> getJobsSuspended() {
    +		return jobsSuspended;
    +	}
    +
    +	public List<JobID> getJobsReconciling() {
    +		return jobsReconciling;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +
    +	@Override
    +	public int hashCode() {
    +		return jobsCreated.hashCode() ^
    +				jobsRunningOrPending.hashCode() ^
    +				jobsFinished.hashCode() ^
    +				jobsCancelling.hashCode() ^
    +				jobsCancelled.hashCode() ^
    +				jobsFailing.hashCode() ^
    +				jobsFailed.hashCode() ^
    +				jobsRestarting.hashCode() ^
    +				jobsSuspended.hashCode() ^
    +				jobsReconciling.hashCode();
    +	}
    +
    +	@Override
    +	public boolean equals(Object obj) {
    +		if (obj == this) {
    +			return true;
    +		}
    +		else if (obj instanceof JobStatusesWithIdsOverview) {
    +			JobStatusesWithIdsOverview that = (JobStatusesWithIdsOverview) obj;
    +			return this.jobsCreated.equals(that.jobsCreated) &&
    +					this.jobsRunningOrPending.equals(that.jobsRunningOrPending) &&
    +					this.jobsFinished.equals(that.jobsFinished) &&
    +					this.jobsCancelling.equals(that.jobsCancelling) &&
    +					this.jobsCancelled.equals(that.jobsCancelled) &&
    +					this.jobsFailing.equals(that.jobsFailing) &&
    +					this.jobsFailed.equals(that.jobsFailed) &&
    +					this.jobsRestarting.equals(that.jobsRestarting) &&
    +					this.jobsSuspended.equals(that.jobsSuspended) &&
    +					this.jobsReconciling.equals(that.jobsReconciling);
    +		}
    +		else {
    +			return false;
    +		}
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "JobStatusesWithIdsOverview {" +
    +				"createdJobs=" + jobsCreated +
    +				", runningOrPendingJobs=" + jobsRunningOrPending +
    +				", finishedJobs=" + jobsFinished +
    +				", cancellingJobs=" + jobsCancelling +
    +				", cancelledJobs=" + jobsCancelled +
    +				", failingJobs=" + jobsFailing +
    +				", failedJobs=" + jobsFailed +
    +				", restartingJobs=" + jobsRestarting +
    +				", suspendedJobs=" + jobsSuspended +
    +				", reconcilingJobs=" + jobsReconciling +
    +				'}';
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Message serializers
    +	// ------------------------------------------------------------------------
    +
    +	public static final class JobStatusesWithIdsOverviewSerializer extends StdSerializer<JobStatusesWithIdsOverview> {
    --- End diff --
    
    Yes, then let's define a `JobID(De)Serializer` similar to the `JobVertexIDSerializer` and annote the respective field.


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    @tzulitai, we actually figured out that we cannot register the `CurrentJobsOverviewHandler` under `/jobs/overview` because it conflicts with `/jobs/:jobid`. Therefore, we thought that we could register the `CurrentJobsOverviewHandler` under `/jobs`. This would then not only return the `JobIDs` but also more information about the job. In the future we might add an include query parameter with which one can select which information to return.


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    @tillrohrmann the PR is now rebased. Previous comments have also been addressed.


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    Maybe you could rebase your work on top of https://github.com/tillrohrmann/flink/tree/FLINK-7870. That's the branch where I stacked multiple of the pending handler PRs and which I will merge shortly after the 1.4 release branch has been cut.


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    Thanks a lot @tzulitai. Will take another look at it and merge it if I have no other comments.


---

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4734#discussion_r143718052
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -242,6 +248,86 @@ public void start() throws Exception {
     	}
     
     	@Override
    +	public CompletableFuture<JobStatusesWithIdsOverview> requestJobIdsOverview(@RpcTimeout Time timeout) {
    --- End diff --
    
    I think it would be better to define `requestJobsOverview` which simply returns a `Collection<JobOverview>` with `JobOverview` containing the `JobID` and the `JobStatus`, for example. All other data processing and data restructuring like grouping the individual jobs to make it easier for the Web UI to display the content, should then happen on the handler and not in the `Dispatcher`. My gut feeling is that this is unnecessary coupling of unrelated components.


---

[GitHub] flink pull request #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4734#discussion_r143718904
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobStatusesWithIdsOverview.java ---
    @@ -0,0 +1,368 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.messages.webmonitor;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import com.fasterxml.jackson.core.JsonParser;
    +import com.fasterxml.jackson.databind.DeserializationContext;
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.SerializerProvider;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
    +import com.fasterxml.jackson.databind.ser.std.StdSerializer;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * An overview of how many jobs are in which status.
    + */
    +@JsonSerialize(using = JobStatusesWithIdsOverview.JobStatusesWithIdsOverviewSerializer.class)
    +@JsonDeserialize(using = JobStatusesWithIdsOverview.JobStatusesWithIdsOverviewDeserializer.class)
    +public class JobStatusesWithIdsOverview implements ResponseBody, InfoMessage {
    +
    +	private static final long serialVersionUID = -3699051943490133183L;
    +
    +	public static final String FIELD_NAME_JOBS_CREATED_IDS = "jobs-created";
    +	public static final String FIELD_NAME_JOBS_RUNNING_IDS = "jobs-running";
    +	public static final String FIELD_NAME_JOBS_FINISHED_IDS = "jobs-finished";
    +	public static final String FIELD_NAME_JOBS_CANCELLING_IDS = "jobs-cancelling";
    +	public static final String FIELD_NAME_JOBS_CANCELLED_IDS = "jobs-cancelled";
    +	public static final String FIELD_NAME_JOBS_FAILING_IDS = "jobs-failing";
    +	public static final String FIELD_NAME_JOBS_FAILED_IDS = "jobs-failed";
    +	public static final String FIELD_NAME_JOBS_RESTARTING_IDS = "jobs-restarting";
    +	public static final String FIELD_NAME_JOBS_SUSPENDED_IDS = "jobs-suspended";
    +	public static final String FIELD_NAME_JOBS_RECONCILING_IDS = "jobs-reconciling";
    +
    +	private final List<JobID> jobsCreated;
    +	private final List<JobID> jobsRunningOrPending;
    +	private final List<JobID> jobsFinished;
    +	private final List<JobID> jobsCancelling;
    +	private final List<JobID> jobsCancelled;
    +	private final List<JobID> jobsFailing;
    +	private final List<JobID> jobsFailed;
    +	private final List<JobID> jobsRestarting;
    +	private final List<JobID> jobsSuspended;
    +	private final List<JobID> jobsReconciling;
    +
    +	public JobStatusesWithIdsOverview(
    +			List<JobID> jobsCreated,
    +			List<JobID> jobsRunningOrPending,
    +			List<JobID> jobsFinished,
    +			List<JobID> jobsCancelling,
    +			List<JobID> jobsCancelled,
    +			List<JobID> jobsFailing,
    +			List<JobID> jobsFailed,
    +			List<JobID> jobsRestarting,
    +			List<JobID> jobsSuspended,
    +			List<JobID> jobsReconciling) {
    +
    +		this.jobsCreated = checkNotNull(jobsCreated);
    +		this.jobsRunningOrPending = checkNotNull(jobsRunningOrPending);
    +		this.jobsFinished = checkNotNull(jobsFinished);
    +		this.jobsCancelling = checkNotNull(jobsCancelling);
    +		this.jobsCancelled = checkNotNull(jobsCancelled);
    +		this.jobsFailing = checkNotNull(jobsFailing);
    +		this.jobsFailed = checkNotNull(jobsFailed);
    +		this.jobsRestarting = checkNotNull(jobsRestarting);
    +		this.jobsSuspended = checkNotNull(jobsSuspended);
    +		this.jobsReconciling = checkNotNull(jobsReconciling);
    +	}
    +
    +	public JobStatusesWithIdsOverview(JobStatusesWithIdsOverview first, JobStatusesWithIdsOverview second) {
    +		this.jobsCreated = combine(first.getJobsCreated(), second.getJobsCreated());
    +		this.jobsRunningOrPending = combine(first.getJobsRunningOrPending(), second.getJobsRunningOrPending());
    +		this.jobsFinished = combine(first.getJobsFinished(), second.getJobsFinished());
    +		this.jobsCancelling = combine(first.getJobsCancelling(), second.getJobsCancelling());
    +		this.jobsCancelled = combine(first.getJobsCancelled(), second.getJobsCancelled());
    +		this.jobsFailing = combine(first.getJobsFailing(), second.getJobsFailing());
    +		this.jobsFailed = combine(first.getJobsFailed(), second.getJobsFailed());
    +		this.jobsRestarting = combine(first.getJobsRestarting(), second.getJobsRestarting());
    +		this.jobsSuspended = combine(first.getJobsSuspended(), second.getJobsSuspended());
    +		this.jobsReconciling = combine(first.getJobsReconciling(), second.getJobsReconciling());
    +	}
    +
    +	public List<JobID> getJobsCreated() {
    +		return jobsCreated;
    +	}
    +
    +	public List<JobID> getJobsRunningOrPending() {
    +		return jobsRunningOrPending;
    +	}
    +
    +	public List<JobID> getJobsFinished() {
    +		return jobsFinished;
    +	}
    +
    +	public List<JobID> getJobsCancelling() {
    +		return jobsCancelling;
    +	}
    +
    +	public List<JobID> getJobsCancelled() {
    +		return jobsCancelled;
    +	}
    +
    +	public List<JobID> getJobsFailing() {
    +		return jobsFailing;
    +	}
    +
    +	public List<JobID> getJobsFailed() {
    +		return jobsFailed;
    +	}
    +
    +	public List<JobID> getJobsRestarting() {
    +		return jobsRestarting;
    +	}
    +
    +	public List<JobID> getJobsSuspended() {
    +		return jobsSuspended;
    +	}
    +
    +	public List<JobID> getJobsReconciling() {
    +		return jobsReconciling;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +
    +	@Override
    +	public int hashCode() {
    +		return jobsCreated.hashCode() ^
    +				jobsRunningOrPending.hashCode() ^
    +				jobsFinished.hashCode() ^
    +				jobsCancelling.hashCode() ^
    +				jobsCancelled.hashCode() ^
    +				jobsFailing.hashCode() ^
    +				jobsFailed.hashCode() ^
    +				jobsRestarting.hashCode() ^
    +				jobsSuspended.hashCode() ^
    +				jobsReconciling.hashCode();
    +	}
    +
    +	@Override
    +	public boolean equals(Object obj) {
    +		if (obj == this) {
    +			return true;
    +		}
    +		else if (obj instanceof JobStatusesWithIdsOverview) {
    +			JobStatusesWithIdsOverview that = (JobStatusesWithIdsOverview) obj;
    +			return this.jobsCreated.equals(that.jobsCreated) &&
    +					this.jobsRunningOrPending.equals(that.jobsRunningOrPending) &&
    +					this.jobsFinished.equals(that.jobsFinished) &&
    +					this.jobsCancelling.equals(that.jobsCancelling) &&
    +					this.jobsCancelled.equals(that.jobsCancelled) &&
    +					this.jobsFailing.equals(that.jobsFailing) &&
    +					this.jobsFailed.equals(that.jobsFailed) &&
    +					this.jobsRestarting.equals(that.jobsRestarting) &&
    +					this.jobsSuspended.equals(that.jobsSuspended) &&
    +					this.jobsReconciling.equals(that.jobsReconciling);
    +		}
    +		else {
    +			return false;
    +		}
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "JobStatusesWithIdsOverview {" +
    +				"createdJobs=" + jobsCreated +
    +				", runningOrPendingJobs=" + jobsRunningOrPending +
    +				", finishedJobs=" + jobsFinished +
    +				", cancellingJobs=" + jobsCancelling +
    +				", cancelledJobs=" + jobsCancelled +
    +				", failingJobs=" + jobsFailing +
    +				", failedJobs=" + jobsFailed +
    +				", restartingJobs=" + jobsRestarting +
    +				", suspendedJobs=" + jobsSuspended +
    +				", reconcilingJobs=" + jobsReconciling +
    +				'}';
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Message serializers
    +	// ------------------------------------------------------------------------
    +
    +	public static final class JobStatusesWithIdsOverviewSerializer extends StdSerializer<JobStatusesWithIdsOverview> {
    --- End diff --
    
    Just out of curiosity, why did you define a (de)serializer for this class? The structure looks as if this could have been done automatically by the `ObjectMapper`.


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    I agree with @zentol. Moreover, I would like to change `MultipleJobsDetails` to not split the job details into running and finished. Just a collection of `JobDetails`.


---

[GitHub] flink issue #4734: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new RES...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4734
  
    @tzulitai, I actually want to re-check whether it's indeed not possible to register the `CurrentJobsOverviewHandler` under `jobs/overview`. If this should indeed be the case, then it will most likely subsume this handler. If not, then we'll register this handler under `/jobs`


---