You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/10/02 17:55:50 UTC

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4763

    [FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint

    ## What is the purpose of the change
    
    Adds the new `CheckpointStatisticDetailsHandler` for the new REST server endpoint.
    
    Moreover, this PR disables the `FAIL_ON_MISSING_CREATOR_PROPERTIES` property for the `RestMapperUtils.getStrictObjectMapper` because that is something the individuals beans can do on their own (e.g. by checking with `Preconditions.checkNotNull`).
    
    R @zentol because of the changes to the `ObjectMapper` setup.
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as `CheckpointingStatisticsTest`.
    
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink portCheckpointStatsDetailsHandler

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4763.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4763
    
----
commit c09e579b26752f2ae477d60e8fbb6ccdce315df9
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-09-25T13:29:59Z

    [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers
    
    The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the latter, the former
    does not expect the AccessExecutionGraph to be the true ExecutionGraph. Instead it assumes
    it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache entries after
    a given time to live period. This will trigger requesting the AccessExecutionGraph again
    and, thus, updating the ExecutionGraph information for the ExecutionGraph based REST
    handlers.
    
    In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic cleanup task
    which triggers ExecutionGraphCache.cleanup. This methods releases all cache entries which
    have exceeded their time to live. Currently it is set to 20 * refreshInterval of the
    web gui.
    
    This closes #4728.

commit a4f9ef81c02738b40cf0ba375650684b46f5417d
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-09-26T16:39:15Z

    [FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint
    
    This closes #4737.

commit 4259fcc96c7c72644806941bd0df9f508a2f0bcd
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-09-28T16:35:50Z

    [FLINK-7708] [flip6] Add CheckpointConfigHandler for new REST endpoint
    
    This commit implements the CheckpointConfigHandler which now returns a
    CheckpointConfigInfo object if checkpointing is enabled. In case that
    checkpointing is disabled for a job, it will return a 404 response.

commit 6f1756b541ad76521c7b653f834c1032c623f1e6
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-09-29T13:09:06Z

    [FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST endpoint
    
    This commit also makes the CheckpointStatsHistory object serializable by removing the
    CheckpointStatsHistoryIterable and replacing it with a static ArrayList.

commit bd8109b0af1a90fc32f76e57261a252762d678eb
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-10-02T17:39:38Z

    [FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint

commit 4d694411e9e4b6c508c258655c9c69cb26ddb6be
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-10-02T17:52:13Z

    Disable failing when not all creator properties are known

----


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143415816
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java ---
    @@ -33,8 +33,7 @@
     		objectMapper.enable(
     			DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
     			DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
    -			DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
    -			DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
    --- End diff --
    
    Removing this also means we have to add explicit null checks to all existing Request-/ResponseBody classes.


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143508371
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java ---
    @@ -33,8 +33,7 @@
     		objectMapper.enable(
     			DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
     			DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
    -			DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
    -			DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
    --- End diff --
    
    This is a valid objection which I share. I'll remove this change and set the map of `TaskCheckpointStatistics` to an empty map in case that we want to leave the details out.


---

[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4763
  
    @tillrohrmann I wanted to try it out, primarily since i can mark individual files as reviewed. For the remaining files I will once again write the comments on github.


---

[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4763
  
    
    
    
    
    Reviewed 24 of 31 files at r1.
    Review status: 21 of 26 files reviewed at latest revision, 2 unresolved discussions, some commit checks failed.
    
    ---
    
    *[flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java, line 44 at r1](https://reviewable.io:443/reviews/apache/flink/4763#-KvbgxjlkW5IZW5m8HYM:-KvbgxjlkW5IZW5m8HYN:bwa0bd4) ([raw file](https://github.com/apache/flink/blob/d152cf222422fd1565f4ea60fd6e8b62604a1dd1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java#L44)):*
    > ```Java
    > 	private final Cache<Long, AbstractCheckpointStats> cache;
    > 
    > 	public CheckpointStatsCache(int maxNumEntries) {
    > ```
    
    The CheckpointStatsCache should be moved out of the legacy namespace.
    
    ---
    
    *[flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java, line 233 at r1](https://reviewable.io:443/reviews/apache/flink/4763#-Kvbj3-UxgqUuhmsfJcA:-Kvbj3-VGSGSiPo7duwE:b-ryryhy) ([raw file](https://github.com/apache/flink/blob/d152cf222422fd1565f4ea60fd6e8b62604a1dd1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java#L233)):*
    > ```Java
    > 				}
    > 			} else {
    > 				checkpointStatisticsPerTask = Collections.emptyMap();
    > ```
    
    This method is near identical to `CheckpointingStatistics#generate...`, except this one line.
    
    ---
    
    
    *Comments from [Reviewable](https://reviewable.io:443/reviews/apache/flink/4763)*
    <!-- Sent from Reviewable.io -->



---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143499239
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.checkpoints;
    +
    +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.TaskStateStats;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonSubTypes;
    +import com.fasterxml.jackson.annotation.JsonTypeInfo;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +/**
    + * Statistics for a checkpoint.
    + */
    +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    +@JsonSubTypes({
    +	@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
    +	@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class CheckpointStatistics implements ResponseBody {
    +
    +	public static final String FIELD_NAME_ID = "id";
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +
    +	public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
    +
    +	public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
    +
    +	public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +
    +	public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +	public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +	public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
    +
    +	public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +	public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +
    +	public static final String FIELD_NAME_TASKS = "tasks";
    +
    +	@JsonProperty(FIELD_NAME_ID)
    +	private final long id;
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final CheckpointStatsStatus status;
    +
    +	@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
    +	private final boolean savepoint;
    +
    +	@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
    +	private final long triggerTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +	private final long latestAckTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_STATE_SIZE)
    +	private final long stateSize;
    +
    +	@JsonProperty(FIELD_NAME_DURATION)
    +	private final long duration;
    +
    +	@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +	private final long alignmentBuffered;
    +
    +	@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +	private final int numSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +	private final int numAckSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_TASKS)
    +	@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
    +	@Nullable
    +	private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +	@JsonCreator
    +	private CheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
    +		this.id = id;
    --- End diff --
    
    Good catch. Will add the checks.


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143500886
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.checkpoints;
    +
    +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.TaskStateStats;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonSubTypes;
    +import com.fasterxml.jackson.annotation.JsonTypeInfo;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +/**
    + * Statistics for a checkpoint.
    + */
    +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    +@JsonSubTypes({
    +	@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
    +	@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class CheckpointStatistics implements ResponseBody {
    +
    +	public static final String FIELD_NAME_ID = "id";
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +
    +	public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
    +
    +	public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
    +
    +	public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +
    +	public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +	public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +	public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
    +
    +	public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +	public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +
    +	public static final String FIELD_NAME_TASKS = "tasks";
    +
    +	@JsonProperty(FIELD_NAME_ID)
    +	private final long id;
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final CheckpointStatsStatus status;
    +
    +	@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
    +	private final boolean savepoint;
    +
    +	@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
    +	private final long triggerTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +	private final long latestAckTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_STATE_SIZE)
    +	private final long stateSize;
    +
    +	@JsonProperty(FIELD_NAME_DURATION)
    +	private final long duration;
    +
    +	@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +	private final long alignmentBuffered;
    +
    +	@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +	private final int numSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +	private final int numAckSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_TASKS)
    +	@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
    +	@Nullable
    +	private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +	@JsonCreator
    +	private CheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
    +		this.id = id;
    --- End diff --
    
    Hmm I think all not checked parameters are actually primitives and, thus, don't need to be checked for null.


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143415369
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.checkpoints;
    +
    +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.TaskStateStats;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonSubTypes;
    +import com.fasterxml.jackson.annotation.JsonTypeInfo;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +/**
    + * Statistics for a checkpoint.
    + */
    +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    +@JsonSubTypes({
    +	@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
    +	@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class CheckpointStatistics implements ResponseBody {
    +
    +	public static final String FIELD_NAME_ID = "id";
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +
    +	public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
    +
    +	public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
    +
    +	public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +
    +	public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +	public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +	public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
    +
    +	public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +	public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +
    +	public static final String FIELD_NAME_TASKS = "tasks";
    +
    +	@JsonProperty(FIELD_NAME_ID)
    +	private final long id;
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final CheckpointStatsStatus status;
    +
    +	@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
    +	private final boolean savepoint;
    +
    +	@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
    +	private final long triggerTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +	private final long latestAckTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_STATE_SIZE)
    +	private final long stateSize;
    +
    +	@JsonProperty(FIELD_NAME_DURATION)
    +	private final long duration;
    +
    +	@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +	private final long alignmentBuffered;
    +
    +	@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +	private final int numSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +	private final int numAckSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_TASKS)
    +	@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
    +	@Nullable
    +	private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +	@JsonCreator
    +	private CheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
    +		this.id = id;
    +		this.status = Preconditions.checkNotNull(status);
    +		this.savepoint = savepoint;
    +		this.triggerTimestamp = triggerTimestamp;
    +		this.latestAckTimestamp = latestAckTimestamp;
    +		this.stateSize = stateSize;
    +		this.duration = duration;
    +		this.alignmentBuffered = alignmentBuffered;
    +		this.numSubtasks = numSubtasks;
    +		this.numAckSubtasks = numAckSubtasks;
    +		this.checkpointStatisticsPerTask = checkpointStatisticsPerTask;
    +	}
    +
    +	public long getId() {
    +		return id;
    +	}
    +
    +	public CheckpointStatsStatus getStatus() {
    +		return status;
    +	}
    +
    +	public boolean isSavepoint() {
    +		return savepoint;
    +	}
    +
    +	public long getTriggerTimestamp() {
    +		return triggerTimestamp;
    +	}
    +
    +	public long getLatestAckTimestamp() {
    +		return latestAckTimestamp;
    +	}
    +
    +	public long getStateSize() {
    +		return stateSize;
    +	}
    +
    +	public long getDuration() {
    +		return duration;
    +	}
    +
    +	public long getAlignmentBuffered() {
    +		return alignmentBuffered;
    +	}
    +
    +	public int getNumSubtasks() {
    +		return numSubtasks;
    +	}
    +
    +	public int getNumAckSubtasks() {
    +		return numAckSubtasks;
    +	}
    +
    +	@Nullable
    +	public Map<JobVertexID, TaskCheckpointStatistics> getCheckpointStatisticsPerTask() {
    +		return checkpointStatisticsPerTask;
    +	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || getClass() != o.getClass()) {
    +			return false;
    +		}
    +		CheckpointStatistics that = (CheckpointStatistics) o;
    +		return id == that.id &&
    +			savepoint == that.savepoint &&
    +			triggerTimestamp == that.triggerTimestamp &&
    +			latestAckTimestamp == that.latestAckTimestamp &&
    +			stateSize == that.stateSize &&
    +			duration == that.duration &&
    +			alignmentBuffered == that.alignmentBuffered &&
    +			numSubtasks == that.numSubtasks &&
    +			numAckSubtasks == that.numAckSubtasks &&
    +			status == that.status &&
    +			Objects.equals(checkpointStatisticsPerTask, that.checkpointStatisticsPerTask);
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks, checkpointStatisticsPerTask);
    +	}
    +
    +	public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats, boolean includeTaskCheckpointStatistics) {
    +		if (checkpointStats != null) {
    +
    +			Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +			if (includeTaskCheckpointStatistics) {
    +				Collection<TaskStateStats> taskStateStats = checkpointStats.getAllTaskStateStats();
    +
    +				checkpointStatisticsPerTask = new HashMap<>(taskStateStats.size());
    +
    +				for (TaskStateStats taskStateStat : taskStateStats) {
    +					checkpointStatisticsPerTask.put(
    +						taskStateStat.getJobVertexId(),
    +						new TaskCheckpointStatistics(
    +							taskStateStat.getLatestAckTimestamp(),
    +							taskStateStat.getStateSize(),
    +							taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
    +							taskStateStat.getAlignmentBuffered(),
    +							taskStateStat.getNumberOfSubtasks(),
    +							taskStateStat.getNumberOfAcknowledgedSubtasks()));
    +				}
    +			} else {
    +				checkpointStatisticsPerTask = null;
    +			}
    +
    +			if (checkpointStats instanceof CompletedCheckpointStats) {
    +				final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
    +
    +				return new CheckpointStatistics.CompletedCheckpointStatistics(
    +					completedCheckpointStats.getCheckpointId(),
    +					completedCheckpointStats.getStatus(),
    +					completedCheckpointStats.getProperties().isSavepoint(),
    +					completedCheckpointStats.getTriggerTimestamp(),
    +					completedCheckpointStats.getLatestAckTimestamp(),
    +					completedCheckpointStats.getStateSize(),
    +					completedCheckpointStats.getEndToEndDuration(),
    +					completedCheckpointStats.getAlignmentBuffered(),
    +					completedCheckpointStats.getNumberOfSubtasks(),
    +					completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +					checkpointStatisticsPerTask,
    +					completedCheckpointStats.getExternalPath(),
    +					completedCheckpointStats.isDiscarded());
    +			} else if (checkpointStats instanceof FailedCheckpointStats) {
    +				final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
    +
    +				return new CheckpointStatistics.FailedCheckpointStatistics(
    +					failedCheckpointStats.getCheckpointId(),
    +					failedCheckpointStats.getStatus(),
    +					failedCheckpointStats.getProperties().isSavepoint(),
    +					failedCheckpointStats.getTriggerTimestamp(),
    +					failedCheckpointStats.getLatestAckTimestamp(),
    +					failedCheckpointStats.getStateSize(),
    +					failedCheckpointStats.getEndToEndDuration(),
    +					failedCheckpointStats.getAlignmentBuffered(),
    +					failedCheckpointStats.getNumberOfSubtasks(),
    +					failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +					checkpointStatisticsPerTask,
    +					failedCheckpointStats.getFailureTimestamp(),
    +					failedCheckpointStats.getFailureMessage());
    +			} else {
    +				throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted.");
    +			}
    +		} else {
    +			return null;
    +		}
    +	}
    +
    +	/**
    +	 * Checkpoint statistics for a single task.
    +	 */
    +	public static final class TaskCheckpointStatistics {
    +
    +		public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +
    +		public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +		public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +		public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
    +
    +		public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +		public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +
    +		@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +		private final long latestAckTimestamp;
    +
    +		@JsonProperty(FIELD_NAME_STATE_SIZE)
    +		private final long stateSize;
    +
    +		@JsonProperty(FIELD_NAME_DURATION)
    +		private final long duration;
    +
    +		@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +		private final long alignmentBuffered;
    +
    +		@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +		private final int numSubtasks;
    +
    +		@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +		private final int numAckSubtasks;
    +
    +		@JsonCreator
    +		public TaskCheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) {
    +			this.latestAckTimestamp = latestAckTimestamp;
    +			this.stateSize = stateSize;
    +			this.duration = duration;
    +			this.alignmentBuffered = alignmentBuffered;
    +			this.numSubtasks = numSubtasks;
    +			this.numAckSubtasks = numAckSubtasks;
    +		}
    +
    +		public long getLatestAckTimestamp() {
    +			return latestAckTimestamp;
    +		}
    +
    +		public long getStateSize() {
    +			return stateSize;
    +		}
    +
    +		public long getDuration() {
    +			return duration;
    +		}
    +
    +		public long getAlignmentBuffered() {
    +			return alignmentBuffered;
    +		}
    +
    +		public int getNumSubtasks() {
    +			return numSubtasks;
    +		}
    +
    +		public int getNumAckSubtasks() {
    +			return numAckSubtasks;
    +		}
    +
    +		@Override
    +		public boolean equals(Object o) {
    +			if (this == o) {
    +				return true;
    +			}
    +			if (o == null || getClass() != o.getClass()) {
    +				return false;
    +			}
    +			TaskCheckpointStatistics that = (TaskCheckpointStatistics) o;
    +			return latestAckTimestamp == that.latestAckTimestamp &&
    +				stateSize == that.stateSize &&
    +				duration == that.duration &&
    +				alignmentBuffered == that.alignmentBuffered &&
    +				numSubtasks == that.numSubtasks &&
    +				numAckSubtasks == that.numAckSubtasks;
    +		}
    +
    +		@Override
    +		public int hashCode() {
    +			return Objects.hash(latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
    +		}
    +	}
    +
    +	/**
    +	 * Statistics for a completed checkpoint.
    +	 */
    +	public static final class CompletedCheckpointStatistics extends CheckpointStatistics {
    +
    +		public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
    +
    +		public static final String FIELD_NAME_DISCARDED = "discarded";
    +
    +		@JsonProperty(FIELD_NAME_EXTERNAL_PATH)
    +		@Nullable
    +		private final String externalPath;
    +
    +		@JsonProperty(FIELD_NAME_DISCARDED)
    +		private final boolean discarded;
    +
    +		@JsonCreator
    +		public CompletedCheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
    +			@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
    +			@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
    +			super(
    +				id,
    +				status,
    +				savepoint,
    +				triggerTimestamp,
    +				latestAckTimestamp,
    +				stateSize,
    +				duration,
    +				alignmentBuffered,
    +				numSubtasks,
    +				numAckSubtasks,
    +				checkpointingStatisticsPerTask);
    +
    +			this.externalPath = externalPath;
    --- End diff --
    
    missing null checks


---

[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4763
  
    Rebased onto the latest master


---

[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4763
  
    Rebasing onto the latest master.


---

[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4763
  
    no objections, feel free to merge this.


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann closed the pull request at:

    https://github.com/apache/flink/pull/4763


---

[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4763
  
    I still have to look at the JSON generation in this PR.


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143415293
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.checkpoints;
    +
    +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.TaskStateStats;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonSubTypes;
    +import com.fasterxml.jackson.annotation.JsonTypeInfo;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +/**
    + * Statistics for a checkpoint.
    + */
    +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    +@JsonSubTypes({
    +	@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
    +	@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class CheckpointStatistics implements ResponseBody {
    +
    +	public static final String FIELD_NAME_ID = "id";
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +
    +	public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
    +
    +	public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
    +
    +	public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +
    +	public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +	public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +	public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
    +
    +	public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +	public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +
    +	public static final String FIELD_NAME_TASKS = "tasks";
    +
    +	@JsonProperty(FIELD_NAME_ID)
    +	private final long id;
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final CheckpointStatsStatus status;
    +
    +	@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
    +	private final boolean savepoint;
    +
    +	@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
    +	private final long triggerTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +	private final long latestAckTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_STATE_SIZE)
    +	private final long stateSize;
    +
    +	@JsonProperty(FIELD_NAME_DURATION)
    +	private final long duration;
    +
    +	@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +	private final long alignmentBuffered;
    +
    +	@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +	private final int numSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +	private final int numAckSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_TASKS)
    +	@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
    +	@Nullable
    +	private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +	@JsonCreator
    +	private CheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
    +		this.id = id;
    +		this.status = Preconditions.checkNotNull(status);
    +		this.savepoint = savepoint;
    +		this.triggerTimestamp = triggerTimestamp;
    +		this.latestAckTimestamp = latestAckTimestamp;
    +		this.stateSize = stateSize;
    +		this.duration = duration;
    +		this.alignmentBuffered = alignmentBuffered;
    +		this.numSubtasks = numSubtasks;
    +		this.numAckSubtasks = numAckSubtasks;
    +		this.checkpointStatisticsPerTask = checkpointStatisticsPerTask;
    +	}
    +
    +	public long getId() {
    +		return id;
    +	}
    +
    +	public CheckpointStatsStatus getStatus() {
    +		return status;
    +	}
    +
    +	public boolean isSavepoint() {
    +		return savepoint;
    +	}
    +
    +	public long getTriggerTimestamp() {
    +		return triggerTimestamp;
    +	}
    +
    +	public long getLatestAckTimestamp() {
    +		return latestAckTimestamp;
    +	}
    +
    +	public long getStateSize() {
    +		return stateSize;
    +	}
    +
    +	public long getDuration() {
    +		return duration;
    +	}
    +
    +	public long getAlignmentBuffered() {
    +		return alignmentBuffered;
    +	}
    +
    +	public int getNumSubtasks() {
    +		return numSubtasks;
    +	}
    +
    +	public int getNumAckSubtasks() {
    +		return numAckSubtasks;
    +	}
    +
    +	@Nullable
    +	public Map<JobVertexID, TaskCheckpointStatistics> getCheckpointStatisticsPerTask() {
    +		return checkpointStatisticsPerTask;
    +	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || getClass() != o.getClass()) {
    +			return false;
    +		}
    +		CheckpointStatistics that = (CheckpointStatistics) o;
    +		return id == that.id &&
    +			savepoint == that.savepoint &&
    +			triggerTimestamp == that.triggerTimestamp &&
    +			latestAckTimestamp == that.latestAckTimestamp &&
    +			stateSize == that.stateSize &&
    +			duration == that.duration &&
    +			alignmentBuffered == that.alignmentBuffered &&
    +			numSubtasks == that.numSubtasks &&
    +			numAckSubtasks == that.numAckSubtasks &&
    +			status == that.status &&
    +			Objects.equals(checkpointStatisticsPerTask, that.checkpointStatisticsPerTask);
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks, checkpointStatisticsPerTask);
    +	}
    +
    +	public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats, boolean includeTaskCheckpointStatistics) {
    +		if (checkpointStats != null) {
    +
    +			Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +			if (includeTaskCheckpointStatistics) {
    +				Collection<TaskStateStats> taskStateStats = checkpointStats.getAllTaskStateStats();
    +
    +				checkpointStatisticsPerTask = new HashMap<>(taskStateStats.size());
    +
    +				for (TaskStateStats taskStateStat : taskStateStats) {
    +					checkpointStatisticsPerTask.put(
    +						taskStateStat.getJobVertexId(),
    +						new TaskCheckpointStatistics(
    +							taskStateStat.getLatestAckTimestamp(),
    +							taskStateStat.getStateSize(),
    +							taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
    +							taskStateStat.getAlignmentBuffered(),
    +							taskStateStat.getNumberOfSubtasks(),
    +							taskStateStat.getNumberOfAcknowledgedSubtasks()));
    +				}
    +			} else {
    +				checkpointStatisticsPerTask = null;
    +			}
    +
    +			if (checkpointStats instanceof CompletedCheckpointStats) {
    +				final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
    +
    +				return new CheckpointStatistics.CompletedCheckpointStatistics(
    +					completedCheckpointStats.getCheckpointId(),
    +					completedCheckpointStats.getStatus(),
    +					completedCheckpointStats.getProperties().isSavepoint(),
    +					completedCheckpointStats.getTriggerTimestamp(),
    +					completedCheckpointStats.getLatestAckTimestamp(),
    +					completedCheckpointStats.getStateSize(),
    +					completedCheckpointStats.getEndToEndDuration(),
    +					completedCheckpointStats.getAlignmentBuffered(),
    +					completedCheckpointStats.getNumberOfSubtasks(),
    +					completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +					checkpointStatisticsPerTask,
    +					completedCheckpointStats.getExternalPath(),
    +					completedCheckpointStats.isDiscarded());
    +			} else if (checkpointStats instanceof FailedCheckpointStats) {
    +				final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
    +
    +				return new CheckpointStatistics.FailedCheckpointStatistics(
    +					failedCheckpointStats.getCheckpointId(),
    +					failedCheckpointStats.getStatus(),
    +					failedCheckpointStats.getProperties().isSavepoint(),
    +					failedCheckpointStats.getTriggerTimestamp(),
    +					failedCheckpointStats.getLatestAckTimestamp(),
    +					failedCheckpointStats.getStateSize(),
    +					failedCheckpointStats.getEndToEndDuration(),
    +					failedCheckpointStats.getAlignmentBuffered(),
    +					failedCheckpointStats.getNumberOfSubtasks(),
    +					failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +					checkpointStatisticsPerTask,
    +					failedCheckpointStats.getFailureTimestamp(),
    +					failedCheckpointStats.getFailureMessage());
    +			} else {
    +				throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted.");
    +			}
    +		} else {
    +			return null;
    --- End diff --
    
    pretty sure this would lead to a NullPointerException, since neither the CheckpointStatshandler, not AbstractExceutionGraphhandler, nor the AbstractRestHandler handle the case of the returned value being null.


---

[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4763
  
    Quick question @zentol. You've reviewed and approved #4772 which is based on this one. Do you have more comments for this PR?


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143662806
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.checkpoints;
    +
    +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.TaskStateStats;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonSubTypes;
    +import com.fasterxml.jackson.annotation.JsonTypeInfo;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +/**
    + * Statistics for a checkpoint.
    + */
    +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    +@JsonSubTypes({
    +	@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
    +	@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    --- End diff --
    
    I will remove the Include.NON_NULL directive and re-add the FAIL_ON_MISSING_CREATOR_PROPERTY to the `ObjectMapper`.


---

[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4763
  
    Thanks for the review @zentol. I've addressed your comments: Reverted change to `RestMapperUtils`, removed null branch from `CheckpointStatistics#generateCheckpointStatistics`; Made the `CheckpointStatistics#checkpointStatisticsPerTask` non nullable. If you have no further objections, then I would like to merge this PR once Travis gives green light.


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143502870
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.checkpoints;
    +
    +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.TaskStateStats;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonSubTypes;
    +import com.fasterxml.jackson.annotation.JsonTypeInfo;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +/**
    + * Statistics for a checkpoint.
    + */
    +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    +@JsonSubTypes({
    +	@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
    +	@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class CheckpointStatistics implements ResponseBody {
    +
    +	public static final String FIELD_NAME_ID = "id";
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +
    +	public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
    +
    +	public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
    +
    +	public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +
    +	public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +	public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +	public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
    +
    +	public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +	public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +
    +	public static final String FIELD_NAME_TASKS = "tasks";
    +
    +	@JsonProperty(FIELD_NAME_ID)
    +	private final long id;
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final CheckpointStatsStatus status;
    +
    +	@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
    +	private final boolean savepoint;
    +
    +	@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
    +	private final long triggerTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +	private final long latestAckTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_STATE_SIZE)
    +	private final long stateSize;
    +
    +	@JsonProperty(FIELD_NAME_DURATION)
    +	private final long duration;
    +
    +	@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +	private final long alignmentBuffered;
    +
    +	@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +	private final int numSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +	private final int numAckSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_TASKS)
    +	@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
    +	@Nullable
    +	private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +	@JsonCreator
    +	private CheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
    +		this.id = id;
    --- End diff --
    
    true, all objects are checked.


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143489310
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java ---
    @@ -33,8 +33,7 @@
     		objectMapper.enable(
     			DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
     			DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
    -			DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
    -			DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
    --- End diff --
    
    Do note that we must be on the look-out for requests that use primitive fields, as jackson will default them to 0 if they are missing, which will cause misleading error messages.


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143504804
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.checkpoints;
    +
    +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.TaskStateStats;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonSubTypes;
    +import com.fasterxml.jackson.annotation.JsonTypeInfo;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +/**
    + * Statistics for a checkpoint.
    + */
    +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    +@JsonSubTypes({
    +	@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
    +	@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class CheckpointStatistics implements ResponseBody {
    +
    +	public static final String FIELD_NAME_ID = "id";
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +
    +	public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
    +
    +	public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
    +
    +	public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +
    +	public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +	public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +	public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
    +
    +	public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +	public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +
    +	public static final String FIELD_NAME_TASKS = "tasks";
    +
    +	@JsonProperty(FIELD_NAME_ID)
    +	private final long id;
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final CheckpointStatsStatus status;
    +
    +	@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
    +	private final boolean savepoint;
    +
    +	@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
    +	private final long triggerTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +	private final long latestAckTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_STATE_SIZE)
    +	private final long stateSize;
    +
    +	@JsonProperty(FIELD_NAME_DURATION)
    +	private final long duration;
    +
    +	@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +	private final long alignmentBuffered;
    +
    +	@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +	private final int numSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +	private final int numAckSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_TASKS)
    +	@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
    +	@Nullable
    +	private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +	@JsonCreator
    +	private CheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
    +		this.id = id;
    +		this.status = Preconditions.checkNotNull(status);
    +		this.savepoint = savepoint;
    +		this.triggerTimestamp = triggerTimestamp;
    +		this.latestAckTimestamp = latestAckTimestamp;
    +		this.stateSize = stateSize;
    +		this.duration = duration;
    +		this.alignmentBuffered = alignmentBuffered;
    +		this.numSubtasks = numSubtasks;
    +		this.numAckSubtasks = numAckSubtasks;
    +		this.checkpointStatisticsPerTask = checkpointStatisticsPerTask;
    +	}
    +
    +	public long getId() {
    +		return id;
    +	}
    +
    +	public CheckpointStatsStatus getStatus() {
    +		return status;
    +	}
    +
    +	public boolean isSavepoint() {
    +		return savepoint;
    +	}
    +
    +	public long getTriggerTimestamp() {
    +		return triggerTimestamp;
    +	}
    +
    +	public long getLatestAckTimestamp() {
    +		return latestAckTimestamp;
    +	}
    +
    +	public long getStateSize() {
    +		return stateSize;
    +	}
    +
    +	public long getDuration() {
    +		return duration;
    +	}
    +
    +	public long getAlignmentBuffered() {
    +		return alignmentBuffered;
    +	}
    +
    +	public int getNumSubtasks() {
    +		return numSubtasks;
    +	}
    +
    +	public int getNumAckSubtasks() {
    +		return numAckSubtasks;
    +	}
    +
    +	@Nullable
    +	public Map<JobVertexID, TaskCheckpointStatistics> getCheckpointStatisticsPerTask() {
    +		return checkpointStatisticsPerTask;
    +	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || getClass() != o.getClass()) {
    +			return false;
    +		}
    +		CheckpointStatistics that = (CheckpointStatistics) o;
    +		return id == that.id &&
    +			savepoint == that.savepoint &&
    +			triggerTimestamp == that.triggerTimestamp &&
    +			latestAckTimestamp == that.latestAckTimestamp &&
    +			stateSize == that.stateSize &&
    +			duration == that.duration &&
    +			alignmentBuffered == that.alignmentBuffered &&
    +			numSubtasks == that.numSubtasks &&
    +			numAckSubtasks == that.numAckSubtasks &&
    +			status == that.status &&
    +			Objects.equals(checkpointStatisticsPerTask, that.checkpointStatisticsPerTask);
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks, checkpointStatisticsPerTask);
    +	}
    +
    +	public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats, boolean includeTaskCheckpointStatistics) {
    +		if (checkpointStats != null) {
    +
    +			Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +			if (includeTaskCheckpointStatistics) {
    +				Collection<TaskStateStats> taskStateStats = checkpointStats.getAllTaskStateStats();
    +
    +				checkpointStatisticsPerTask = new HashMap<>(taskStateStats.size());
    +
    +				for (TaskStateStats taskStateStat : taskStateStats) {
    +					checkpointStatisticsPerTask.put(
    +						taskStateStat.getJobVertexId(),
    +						new TaskCheckpointStatistics(
    +							taskStateStat.getLatestAckTimestamp(),
    +							taskStateStat.getStateSize(),
    +							taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
    +							taskStateStat.getAlignmentBuffered(),
    +							taskStateStat.getNumberOfSubtasks(),
    +							taskStateStat.getNumberOfAcknowledgedSubtasks()));
    +				}
    +			} else {
    +				checkpointStatisticsPerTask = null;
    +			}
    +
    +			if (checkpointStats instanceof CompletedCheckpointStats) {
    +				final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
    +
    +				return new CheckpointStatistics.CompletedCheckpointStatistics(
    +					completedCheckpointStats.getCheckpointId(),
    +					completedCheckpointStats.getStatus(),
    +					completedCheckpointStats.getProperties().isSavepoint(),
    +					completedCheckpointStats.getTriggerTimestamp(),
    +					completedCheckpointStats.getLatestAckTimestamp(),
    +					completedCheckpointStats.getStateSize(),
    +					completedCheckpointStats.getEndToEndDuration(),
    +					completedCheckpointStats.getAlignmentBuffered(),
    +					completedCheckpointStats.getNumberOfSubtasks(),
    +					completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +					checkpointStatisticsPerTask,
    +					completedCheckpointStats.getExternalPath(),
    +					completedCheckpointStats.isDiscarded());
    +			} else if (checkpointStats instanceof FailedCheckpointStats) {
    +				final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
    +
    +				return new CheckpointStatistics.FailedCheckpointStatistics(
    +					failedCheckpointStats.getCheckpointId(),
    +					failedCheckpointStats.getStatus(),
    +					failedCheckpointStats.getProperties().isSavepoint(),
    +					failedCheckpointStats.getTriggerTimestamp(),
    +					failedCheckpointStats.getLatestAckTimestamp(),
    +					failedCheckpointStats.getStateSize(),
    +					failedCheckpointStats.getEndToEndDuration(),
    +					failedCheckpointStats.getAlignmentBuffered(),
    +					failedCheckpointStats.getNumberOfSubtasks(),
    +					failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +					checkpointStatisticsPerTask,
    +					failedCheckpointStats.getFailureTimestamp(),
    +					failedCheckpointStats.getFailureMessage());
    +			} else {
    +				throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted.");
    +			}
    +		} else {
    +			return null;
    --- End diff --
    
    I think it should actually work and output a serialized `null` value. However, I this distinction is not necessary and should be better pulled out of this method.


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143415402
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.checkpoints;
    +
    +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.TaskStateStats;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonSubTypes;
    +import com.fasterxml.jackson.annotation.JsonTypeInfo;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +/**
    + * Statistics for a checkpoint.
    + */
    +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    +@JsonSubTypes({
    +	@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
    +	@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class CheckpointStatistics implements ResponseBody {
    +
    +	public static final String FIELD_NAME_ID = "id";
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +
    +	public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
    +
    +	public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
    +
    +	public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +
    +	public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +	public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +	public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
    +
    +	public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +	public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +
    +	public static final String FIELD_NAME_TASKS = "tasks";
    +
    +	@JsonProperty(FIELD_NAME_ID)
    +	private final long id;
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final CheckpointStatsStatus status;
    +
    +	@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
    +	private final boolean savepoint;
    +
    +	@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
    +	private final long triggerTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +	private final long latestAckTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_STATE_SIZE)
    +	private final long stateSize;
    +
    +	@JsonProperty(FIELD_NAME_DURATION)
    +	private final long duration;
    +
    +	@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +	private final long alignmentBuffered;
    +
    +	@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +	private final int numSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +	private final int numAckSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_TASKS)
    +	@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
    +	@Nullable
    +	private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +	@JsonCreator
    +	private CheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
    +		this.id = id;
    +		this.status = Preconditions.checkNotNull(status);
    +		this.savepoint = savepoint;
    +		this.triggerTimestamp = triggerTimestamp;
    +		this.latestAckTimestamp = latestAckTimestamp;
    +		this.stateSize = stateSize;
    +		this.duration = duration;
    +		this.alignmentBuffered = alignmentBuffered;
    +		this.numSubtasks = numSubtasks;
    +		this.numAckSubtasks = numAckSubtasks;
    +		this.checkpointStatisticsPerTask = checkpointStatisticsPerTask;
    +	}
    +
    +	public long getId() {
    +		return id;
    +	}
    +
    +	public CheckpointStatsStatus getStatus() {
    +		return status;
    +	}
    +
    +	public boolean isSavepoint() {
    +		return savepoint;
    +	}
    +
    +	public long getTriggerTimestamp() {
    +		return triggerTimestamp;
    +	}
    +
    +	public long getLatestAckTimestamp() {
    +		return latestAckTimestamp;
    +	}
    +
    +	public long getStateSize() {
    +		return stateSize;
    +	}
    +
    +	public long getDuration() {
    +		return duration;
    +	}
    +
    +	public long getAlignmentBuffered() {
    +		return alignmentBuffered;
    +	}
    +
    +	public int getNumSubtasks() {
    +		return numSubtasks;
    +	}
    +
    +	public int getNumAckSubtasks() {
    +		return numAckSubtasks;
    +	}
    +
    +	@Nullable
    +	public Map<JobVertexID, TaskCheckpointStatistics> getCheckpointStatisticsPerTask() {
    +		return checkpointStatisticsPerTask;
    +	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || getClass() != o.getClass()) {
    +			return false;
    +		}
    +		CheckpointStatistics that = (CheckpointStatistics) o;
    +		return id == that.id &&
    +			savepoint == that.savepoint &&
    +			triggerTimestamp == that.triggerTimestamp &&
    +			latestAckTimestamp == that.latestAckTimestamp &&
    +			stateSize == that.stateSize &&
    +			duration == that.duration &&
    +			alignmentBuffered == that.alignmentBuffered &&
    +			numSubtasks == that.numSubtasks &&
    +			numAckSubtasks == that.numAckSubtasks &&
    +			status == that.status &&
    +			Objects.equals(checkpointStatisticsPerTask, that.checkpointStatisticsPerTask);
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks, checkpointStatisticsPerTask);
    +	}
    +
    +	public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats, boolean includeTaskCheckpointStatistics) {
    +		if (checkpointStats != null) {
    +
    +			Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +			if (includeTaskCheckpointStatistics) {
    +				Collection<TaskStateStats> taskStateStats = checkpointStats.getAllTaskStateStats();
    +
    +				checkpointStatisticsPerTask = new HashMap<>(taskStateStats.size());
    +
    +				for (TaskStateStats taskStateStat : taskStateStats) {
    +					checkpointStatisticsPerTask.put(
    +						taskStateStat.getJobVertexId(),
    +						new TaskCheckpointStatistics(
    +							taskStateStat.getLatestAckTimestamp(),
    +							taskStateStat.getStateSize(),
    +							taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
    +							taskStateStat.getAlignmentBuffered(),
    +							taskStateStat.getNumberOfSubtasks(),
    +							taskStateStat.getNumberOfAcknowledgedSubtasks()));
    +				}
    +			} else {
    +				checkpointStatisticsPerTask = null;
    +			}
    +
    +			if (checkpointStats instanceof CompletedCheckpointStats) {
    +				final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
    +
    +				return new CheckpointStatistics.CompletedCheckpointStatistics(
    +					completedCheckpointStats.getCheckpointId(),
    +					completedCheckpointStats.getStatus(),
    +					completedCheckpointStats.getProperties().isSavepoint(),
    +					completedCheckpointStats.getTriggerTimestamp(),
    +					completedCheckpointStats.getLatestAckTimestamp(),
    +					completedCheckpointStats.getStateSize(),
    +					completedCheckpointStats.getEndToEndDuration(),
    +					completedCheckpointStats.getAlignmentBuffered(),
    +					completedCheckpointStats.getNumberOfSubtasks(),
    +					completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +					checkpointStatisticsPerTask,
    +					completedCheckpointStats.getExternalPath(),
    +					completedCheckpointStats.isDiscarded());
    +			} else if (checkpointStats instanceof FailedCheckpointStats) {
    +				final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
    +
    +				return new CheckpointStatistics.FailedCheckpointStatistics(
    +					failedCheckpointStats.getCheckpointId(),
    +					failedCheckpointStats.getStatus(),
    +					failedCheckpointStats.getProperties().isSavepoint(),
    +					failedCheckpointStats.getTriggerTimestamp(),
    +					failedCheckpointStats.getLatestAckTimestamp(),
    +					failedCheckpointStats.getStateSize(),
    +					failedCheckpointStats.getEndToEndDuration(),
    +					failedCheckpointStats.getAlignmentBuffered(),
    +					failedCheckpointStats.getNumberOfSubtasks(),
    +					failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +					checkpointStatisticsPerTask,
    +					failedCheckpointStats.getFailureTimestamp(),
    +					failedCheckpointStats.getFailureMessage());
    +			} else {
    +				throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted.");
    +			}
    +		} else {
    +			return null;
    +		}
    +	}
    +
    +	/**
    +	 * Checkpoint statistics for a single task.
    +	 */
    +	public static final class TaskCheckpointStatistics {
    +
    +		public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +
    +		public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +		public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +		public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
    +
    +		public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +		public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +
    +		@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +		private final long latestAckTimestamp;
    +
    +		@JsonProperty(FIELD_NAME_STATE_SIZE)
    +		private final long stateSize;
    +
    +		@JsonProperty(FIELD_NAME_DURATION)
    +		private final long duration;
    +
    +		@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +		private final long alignmentBuffered;
    +
    +		@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +		private final int numSubtasks;
    +
    +		@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +		private final int numAckSubtasks;
    +
    +		@JsonCreator
    +		public TaskCheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) {
    +			this.latestAckTimestamp = latestAckTimestamp;
    +			this.stateSize = stateSize;
    +			this.duration = duration;
    +			this.alignmentBuffered = alignmentBuffered;
    +			this.numSubtasks = numSubtasks;
    +			this.numAckSubtasks = numAckSubtasks;
    +		}
    +
    +		public long getLatestAckTimestamp() {
    +			return latestAckTimestamp;
    +		}
    +
    +		public long getStateSize() {
    +			return stateSize;
    +		}
    +
    +		public long getDuration() {
    +			return duration;
    +		}
    +
    +		public long getAlignmentBuffered() {
    +			return alignmentBuffered;
    +		}
    +
    +		public int getNumSubtasks() {
    +			return numSubtasks;
    +		}
    +
    +		public int getNumAckSubtasks() {
    +			return numAckSubtasks;
    +		}
    +
    +		@Override
    +		public boolean equals(Object o) {
    +			if (this == o) {
    +				return true;
    +			}
    +			if (o == null || getClass() != o.getClass()) {
    +				return false;
    +			}
    +			TaskCheckpointStatistics that = (TaskCheckpointStatistics) o;
    +			return latestAckTimestamp == that.latestAckTimestamp &&
    +				stateSize == that.stateSize &&
    +				duration == that.duration &&
    +				alignmentBuffered == that.alignmentBuffered &&
    +				numSubtasks == that.numSubtasks &&
    +				numAckSubtasks == that.numAckSubtasks;
    +		}
    +
    +		@Override
    +		public int hashCode() {
    +			return Objects.hash(latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
    +		}
    +	}
    +
    +	/**
    +	 * Statistics for a completed checkpoint.
    +	 */
    +	public static final class CompletedCheckpointStatistics extends CheckpointStatistics {
    +
    +		public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
    +
    +		public static final String FIELD_NAME_DISCARDED = "discarded";
    +
    +		@JsonProperty(FIELD_NAME_EXTERNAL_PATH)
    +		@Nullable
    +		private final String externalPath;
    +
    +		@JsonProperty(FIELD_NAME_DISCARDED)
    +		private final boolean discarded;
    +
    +		@JsonCreator
    +		public CompletedCheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
    +			@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
    +			@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
    +			super(
    +				id,
    +				status,
    +				savepoint,
    +				triggerTimestamp,
    +				latestAckTimestamp,
    +				stateSize,
    +				duration,
    +				alignmentBuffered,
    +				numSubtasks,
    +				numAckSubtasks,
    +				checkpointingStatisticsPerTask);
    +
    +			this.externalPath = externalPath;
    +			this.discarded = discarded;
    +		}
    +
    +		@Nullable
    +		public String getExternalPath() {
    +			return externalPath;
    +		}
    +
    +		public boolean isDiscarded() {
    +			return discarded;
    +		}
    +
    +		@Override
    +		public boolean equals(Object o) {
    +			if (this == o) {
    +				return true;
    +			}
    +			if (o == null || getClass() != o.getClass()) {
    +				return false;
    +			}
    +			if (!super.equals(o)) {
    +				return false;
    +			}
    +			CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o;
    +			return discarded == that.discarded &&
    +				Objects.equals(externalPath, that.externalPath);
    +		}
    +
    +		@Override
    +		public int hashCode() {
    +			return Objects.hash(super.hashCode(), externalPath, discarded);
    +		}
    +	}
    +
    +	/**
    +	 * Statistics for a failed checkpoint.
    +	 */
    +	public static final class FailedCheckpointStatistics extends CheckpointStatistics {
    +
    +		public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp";
    +
    +		public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message";
    +
    +		@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP)
    +		private final long failureTimestamp;
    +
    +		@JsonProperty(FIELD_NAME_FAILURE_MESSAGE)
    +		@Nullable
    +		private final String failureMessage;
    +
    +		@JsonCreator
    +		public FailedCheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
    +			@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
    +			@JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
    +			super(
    +				id,
    +				status,
    +				savepoint,
    +				triggerTimestamp,
    +				latestAckTimestamp,
    +				stateSize,
    +				duration,
    +				alignmentBuffered,
    +				numSubtasks,
    +				numAckSubtasks,
    +				checkpointingStatisticsPerTask);
    +
    +			this.failureTimestamp = failureTimestamp;
    --- End diff --
    
    missing null checks


---

[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4763
  
    @zentol I'm not entirely sure whether reviewable makes things better or worse wrt reviewing. At least I can no longer respond to individual comments directly from github.
    
    You're right with both comments. I'll remove the `CheckpointingStatistics#generate` method. Moreover, I'll move the CheckpointStatsCache out of the legacy package.


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143505801
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.checkpoints;
    +
    +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.TaskStateStats;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonSubTypes;
    +import com.fasterxml.jackson.annotation.JsonTypeInfo;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +/**
    + * Statistics for a checkpoint.
    + */
    +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    +@JsonSubTypes({
    +	@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
    +	@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class CheckpointStatistics implements ResponseBody {
    +
    +	public static final String FIELD_NAME_ID = "id";
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +
    +	public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
    +
    +	public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
    +
    +	public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +
    +	public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +	public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +	public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
    +
    +	public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +	public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +
    +	public static final String FIELD_NAME_TASKS = "tasks";
    +
    +	@JsonProperty(FIELD_NAME_ID)
    +	private final long id;
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final CheckpointStatsStatus status;
    +
    +	@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
    +	private final boolean savepoint;
    +
    +	@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
    +	private final long triggerTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +	private final long latestAckTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_STATE_SIZE)
    +	private final long stateSize;
    +
    +	@JsonProperty(FIELD_NAME_DURATION)
    +	private final long duration;
    +
    +	@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +	private final long alignmentBuffered;
    +
    +	@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +	private final int numSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +	private final int numAckSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_TASKS)
    +	@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
    +	@Nullable
    +	private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +	@JsonCreator
    +	private CheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
    +		this.id = id;
    +		this.status = Preconditions.checkNotNull(status);
    +		this.savepoint = savepoint;
    +		this.triggerTimestamp = triggerTimestamp;
    +		this.latestAckTimestamp = latestAckTimestamp;
    +		this.stateSize = stateSize;
    +		this.duration = duration;
    +		this.alignmentBuffered = alignmentBuffered;
    +		this.numSubtasks = numSubtasks;
    +		this.numAckSubtasks = numAckSubtasks;
    +		this.checkpointStatisticsPerTask = checkpointStatisticsPerTask;
    +	}
    +
    +	public long getId() {
    +		return id;
    +	}
    +
    +	public CheckpointStatsStatus getStatus() {
    +		return status;
    +	}
    +
    +	public boolean isSavepoint() {
    +		return savepoint;
    +	}
    +
    +	public long getTriggerTimestamp() {
    +		return triggerTimestamp;
    +	}
    +
    +	public long getLatestAckTimestamp() {
    +		return latestAckTimestamp;
    +	}
    +
    +	public long getStateSize() {
    +		return stateSize;
    +	}
    +
    +	public long getDuration() {
    +		return duration;
    +	}
    +
    +	public long getAlignmentBuffered() {
    +		return alignmentBuffered;
    +	}
    +
    +	public int getNumSubtasks() {
    +		return numSubtasks;
    +	}
    +
    +	public int getNumAckSubtasks() {
    +		return numAckSubtasks;
    +	}
    +
    +	@Nullable
    +	public Map<JobVertexID, TaskCheckpointStatistics> getCheckpointStatisticsPerTask() {
    +		return checkpointStatisticsPerTask;
    +	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || getClass() != o.getClass()) {
    +			return false;
    +		}
    +		CheckpointStatistics that = (CheckpointStatistics) o;
    +		return id == that.id &&
    +			savepoint == that.savepoint &&
    +			triggerTimestamp == that.triggerTimestamp &&
    +			latestAckTimestamp == that.latestAckTimestamp &&
    +			stateSize == that.stateSize &&
    +			duration == that.duration &&
    +			alignmentBuffered == that.alignmentBuffered &&
    +			numSubtasks == that.numSubtasks &&
    +			numAckSubtasks == that.numAckSubtasks &&
    +			status == that.status &&
    +			Objects.equals(checkpointStatisticsPerTask, that.checkpointStatisticsPerTask);
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks, checkpointStatisticsPerTask);
    +	}
    +
    +	public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats, boolean includeTaskCheckpointStatistics) {
    +		if (checkpointStats != null) {
    +
    +			Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +			if (includeTaskCheckpointStatistics) {
    +				Collection<TaskStateStats> taskStateStats = checkpointStats.getAllTaskStateStats();
    +
    +				checkpointStatisticsPerTask = new HashMap<>(taskStateStats.size());
    +
    +				for (TaskStateStats taskStateStat : taskStateStats) {
    +					checkpointStatisticsPerTask.put(
    +						taskStateStat.getJobVertexId(),
    +						new TaskCheckpointStatistics(
    +							taskStateStat.getLatestAckTimestamp(),
    +							taskStateStat.getStateSize(),
    +							taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
    +							taskStateStat.getAlignmentBuffered(),
    +							taskStateStat.getNumberOfSubtasks(),
    +							taskStateStat.getNumberOfAcknowledgedSubtasks()));
    +				}
    +			} else {
    +				checkpointStatisticsPerTask = null;
    +			}
    +
    +			if (checkpointStats instanceof CompletedCheckpointStats) {
    +				final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
    +
    +				return new CheckpointStatistics.CompletedCheckpointStatistics(
    +					completedCheckpointStats.getCheckpointId(),
    +					completedCheckpointStats.getStatus(),
    +					completedCheckpointStats.getProperties().isSavepoint(),
    +					completedCheckpointStats.getTriggerTimestamp(),
    +					completedCheckpointStats.getLatestAckTimestamp(),
    +					completedCheckpointStats.getStateSize(),
    +					completedCheckpointStats.getEndToEndDuration(),
    +					completedCheckpointStats.getAlignmentBuffered(),
    +					completedCheckpointStats.getNumberOfSubtasks(),
    +					completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +					checkpointStatisticsPerTask,
    +					completedCheckpointStats.getExternalPath(),
    +					completedCheckpointStats.isDiscarded());
    +			} else if (checkpointStats instanceof FailedCheckpointStats) {
    +				final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
    +
    +				return new CheckpointStatistics.FailedCheckpointStatistics(
    +					failedCheckpointStats.getCheckpointId(),
    +					failedCheckpointStats.getStatus(),
    +					failedCheckpointStats.getProperties().isSavepoint(),
    +					failedCheckpointStats.getTriggerTimestamp(),
    +					failedCheckpointStats.getLatestAckTimestamp(),
    +					failedCheckpointStats.getStateSize(),
    +					failedCheckpointStats.getEndToEndDuration(),
    +					failedCheckpointStats.getAlignmentBuffered(),
    +					failedCheckpointStats.getNumberOfSubtasks(),
    +					failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +					checkpointStatisticsPerTask,
    +					failedCheckpointStats.getFailureTimestamp(),
    +					failedCheckpointStats.getFailureMessage());
    +			} else {
    +				throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted.");
    +			}
    +		} else {
    +			return null;
    +		}
    +	}
    +
    +	/**
    +	 * Checkpoint statistics for a single task.
    +	 */
    +	public static final class TaskCheckpointStatistics {
    +
    +		public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +
    +		public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +		public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +		public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
    +
    +		public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +		public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +
    +		@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +		private final long latestAckTimestamp;
    +
    +		@JsonProperty(FIELD_NAME_STATE_SIZE)
    +		private final long stateSize;
    +
    +		@JsonProperty(FIELD_NAME_DURATION)
    +		private final long duration;
    +
    +		@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +		private final long alignmentBuffered;
    +
    +		@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +		private final int numSubtasks;
    +
    +		@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +		private final int numAckSubtasks;
    +
    +		@JsonCreator
    +		public TaskCheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) {
    +			this.latestAckTimestamp = latestAckTimestamp;
    +			this.stateSize = stateSize;
    +			this.duration = duration;
    +			this.alignmentBuffered = alignmentBuffered;
    +			this.numSubtasks = numSubtasks;
    +			this.numAckSubtasks = numAckSubtasks;
    +		}
    +
    +		public long getLatestAckTimestamp() {
    +			return latestAckTimestamp;
    +		}
    +
    +		public long getStateSize() {
    +			return stateSize;
    +		}
    +
    +		public long getDuration() {
    +			return duration;
    +		}
    +
    +		public long getAlignmentBuffered() {
    +			return alignmentBuffered;
    +		}
    +
    +		public int getNumSubtasks() {
    +			return numSubtasks;
    +		}
    +
    +		public int getNumAckSubtasks() {
    +			return numAckSubtasks;
    +		}
    +
    +		@Override
    +		public boolean equals(Object o) {
    +			if (this == o) {
    +				return true;
    +			}
    +			if (o == null || getClass() != o.getClass()) {
    +				return false;
    +			}
    +			TaskCheckpointStatistics that = (TaskCheckpointStatistics) o;
    +			return latestAckTimestamp == that.latestAckTimestamp &&
    +				stateSize == that.stateSize &&
    +				duration == that.duration &&
    +				alignmentBuffered == that.alignmentBuffered &&
    +				numSubtasks == that.numSubtasks &&
    +				numAckSubtasks == that.numAckSubtasks;
    +		}
    +
    +		@Override
    +		public int hashCode() {
    +			return Objects.hash(latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
    +		}
    +	}
    +
    +	/**
    +	 * Statistics for a completed checkpoint.
    +	 */
    +	public static final class CompletedCheckpointStatistics extends CheckpointStatistics {
    +
    +		public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
    +
    +		public static final String FIELD_NAME_DISCARDED = "discarded";
    +
    +		@JsonProperty(FIELD_NAME_EXTERNAL_PATH)
    +		@Nullable
    +		private final String externalPath;
    +
    +		@JsonProperty(FIELD_NAME_DISCARDED)
    +		private final boolean discarded;
    +
    +		@JsonCreator
    +		public CompletedCheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
    +			@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
    +			@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
    +			super(
    +				id,
    +				status,
    +				savepoint,
    +				triggerTimestamp,
    +				latestAckTimestamp,
    +				stateSize,
    +				duration,
    +				alignmentBuffered,
    +				numSubtasks,
    +				numAckSubtasks,
    +				checkpointingStatisticsPerTask);
    +
    +			this.externalPath = externalPath;
    +			this.discarded = discarded;
    +		}
    +
    +		@Nullable
    +		public String getExternalPath() {
    +			return externalPath;
    +		}
    +
    +		public boolean isDiscarded() {
    +			return discarded;
    +		}
    +
    +		@Override
    +		public boolean equals(Object o) {
    +			if (this == o) {
    +				return true;
    +			}
    +			if (o == null || getClass() != o.getClass()) {
    +				return false;
    +			}
    +			if (!super.equals(o)) {
    +				return false;
    +			}
    +			CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o;
    +			return discarded == that.discarded &&
    +				Objects.equals(externalPath, that.externalPath);
    +		}
    +
    +		@Override
    +		public int hashCode() {
    +			return Objects.hash(super.hashCode(), externalPath, discarded);
    +		}
    +	}
    +
    +	/**
    +	 * Statistics for a failed checkpoint.
    +	 */
    +	public static final class FailedCheckpointStatistics extends CheckpointStatistics {
    +
    +		public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp";
    +
    +		public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message";
    +
    +		@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP)
    +		private final long failureTimestamp;
    +
    +		@JsonProperty(FIELD_NAME_FAILURE_MESSAGE)
    +		@Nullable
    +		private final String failureMessage;
    +
    +		@JsonCreator
    +		public FailedCheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
    +			@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
    +			@JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
    +			super(
    +				id,
    +				status,
    +				savepoint,
    +				triggerTimestamp,
    +				latestAckTimestamp,
    +				stateSize,
    +				duration,
    +				alignmentBuffered,
    +				numSubtasks,
    +				numAckSubtasks,
    +				checkpointingStatisticsPerTask);
    +
    +			this.failureTimestamp = failureTimestamp;
    --- End diff --
    
    Should be a primitive.


---

[GitHub] flink issue #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler for ne...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4763
  
    Alright @zentol. I guess it would work if I signed up for reviewable.


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143413201
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.checkpoints;
    +
    +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.TaskStateStats;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonSubTypes;
    +import com.fasterxml.jackson.annotation.JsonTypeInfo;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +/**
    + * Statistics for a checkpoint.
    + */
    +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    +@JsonSubTypes({
    +	@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
    +	@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    --- End diff --
    
    We could annotate the particular field instead of the class.


---

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143412982
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.checkpoints;
    +
    +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.TaskStateStats;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonSubTypes;
    +import com.fasterxml.jackson.annotation.JsonTypeInfo;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +/**
    + * Statistics for a checkpoint.
    + */
    +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    +@JsonSubTypes({
    +	@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
    +	@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class CheckpointStatistics implements ResponseBody {
    +
    +	public static final String FIELD_NAME_ID = "id";
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +
    +	public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
    +
    +	public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
    +
    +	public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +
    +	public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +	public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +	public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
    +
    +	public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +	public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +
    +	public static final String FIELD_NAME_TASKS = "tasks";
    +
    +	@JsonProperty(FIELD_NAME_ID)
    +	private final long id;
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final CheckpointStatsStatus status;
    +
    +	@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
    +	private final boolean savepoint;
    +
    +	@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
    +	private final long triggerTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +	private final long latestAckTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_STATE_SIZE)
    +	private final long stateSize;
    +
    +	@JsonProperty(FIELD_NAME_DURATION)
    +	private final long duration;
    +
    +	@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +	private final long alignmentBuffered;
    +
    +	@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +	private final int numSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +	private final int numAckSubtasks;
    +
    +	@JsonProperty(FIELD_NAME_TASKS)
    +	@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
    +	@Nullable
    +	private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
    +
    +	@JsonCreator
    +	private CheckpointStatistics(
    +			@JsonProperty(FIELD_NAME_ID) long id,
    +			@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
    +			@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
    +			@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
    +			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
    +			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +			@JsonProperty(FIELD_NAME_DURATION) long duration,
    +			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
    +			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
    +			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
    +		this.id = id;
    --- End diff --
    
    missing `checkNotNull` check for all fields but `checkpoitnStatisticsPerTask`.


---