You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2017/02/20 15:55:54 UTC

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/3365

    [FLINK-5852] Move handler JSON generation code into static methods

    This PR is part of the History Server implementation. It is opened separately to make the review easier.
    
    The primary change is that the JSON generation of job-specific REST responses was moved from various ```handleRequest``` methods into static methods. This will allow easier re-use. In addition several refactorings have been made and tests were added.
    
    Other changes include:
    * added a utility method ```JsonUtils#addIOMetrics``` to aggregate ```IOMetrics```
    * added a utility method ```JsonUtils#writeIOMetrics``` to write ```IOMetrics```
    * added a utiltiy method ```JsonUtils#writeMinMaxAvg``` to write ```MinMaxAvgStats``` (checkpointing related)
    * replaced **job-related** hard-coded JSON keys with static constants, defined in ```JsonUtils#Keys```
    * added an additional constructor to each ```Archived*``` class for easier generation in tests
    * added ```BuilderUtils``` class for easier generation of ```Archived*``` classes in tests
    * modified ```IOMetrics``` to allow sub-classing without requiring usage of Meters
    * added a test for every introduced static method

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 5852_static_json

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3365.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3365
    
----
commit 5369cf678c252aa5988ca33c68ab79560ff6cd41
Author: zentol <ch...@apache.org>
Date:   2017-02-13T14:41:29Z

    [FLINK-5852] Move handler JSON generation code into static methods

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103712282
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class MutableIOMetrics extends IOMetrics {
    +
    +	private static final long serialVersionUID = -5460777634971381737L;
    +
    +	public MutableIOMetrics() {
    +		super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
    +	}
    +
    +	public void addNumBytesInLocal(long toAdd) {
    +		this.numBytesInLocal += toAdd;
    +	}
    +
    +	public void addNumBytesInRemote(long toAdd) {
    +		this.numBytesInRemote += toAdd;
    +	}
    +
    +	public void addNumBytesOut(long toAdd) {
    +		this.numBytesOut += toAdd;
    +	}
    +
    +	public void addNumRecordsIn(long toAdd) {
    +		this.numRecordsIn += toAdd;
    +	}
    +
    +	public void addNumRecordsOut(long toAdd) {
    +		this.numRecordsOut += toAdd;
    +	}
    +
    +	public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
    --- End diff --
    
    yup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103431658
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    +
    +	@Public
    +	public static final class Keys {
    +		public static final String TASKMANAGERS = "taskmanagers";
    +		public static final String JOB_ID = "jid";
    +		public static final String ID = "id";
    +		public static final String NAME = "name";
    +		public static final String STATE = "state";
    +		public static final String IS_STOPPABLE = "isStoppable";
    +		public static final String PARALLELISM = "parallelism";
    +		public static final String PLAN = "plan";
    +
    +		public static final String START_TIME = "start-time";
    +		public static final String END_TIME = "end-time";
    +		public static final String DURATION = "duration";
    +		public static final String NOW = "now";
    +		public static final String LAST_MODIFICATION = "last-modification";
    +
    +		public static final String TIMESTAMP = "timestamp";
    +		public static final String TIMESTAMPS = "timestamps";
    +		public static final String STATUS_COUNTS = "status-counts";
    +
    +		public static final String REFRESH_INTERVAL = "refresh-interval";
    +		public static final String TIMEZONE_OFFSET = "timezone-offset";
    +		public static final String TIMEZONE_NAME = "timezone-name";
    +		public static final String FLINK_VERSION = "flink-version";
    +		public static final String FLINK_REVISION = "flink-revision";
    +
    +		public static final String EXECUTION_CONFIG = "execution-config";
    +		public static final String MODE = "mode";
    +		public static final String EXECUTION_MODE = "execution-mode";
    +		public static final String RESTART_STRATEGY = "restart-strategy";
    +		public static final String JOB_PARALLELISM = "job-parallelism";
    +		public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
    +		public static final String USER_CONFIG = "user-config";
    +
    +		public static final String ROOT_EXCEPTION = "root-exception";
    +		public static final String ALL_EXCEPTIONS = "all-exceptions";
    +		public static final String EXCEPTION = "exception";
    +		public static final String TRUNCATED = "truncated";
    +
    +		public static final String HOST = "host";
    +		public static final String LOCATION = "location";
    +
    +		public static final String VERTICES = "vertices";
    +		public static final String TASKS = "tasks";
    +		public static final String TASK = "task";
    +		public static final String SUBTASKS = "subtasks";
    +		public static final String SUBTASK = "subtask";
    +		public static final String ATTEMPT = "attempt";
    +
    +		public static final String STATUS = "status";
    +		public static final String TOTAL = "total";
    +		public static final String PENDING = "pending";
    +		public static final String RUNNING = "running";
    +		public static final String FINISHED = "finished";
    +		public static final String CANCELING = "canceling";
    +		public static final String CANCELED = "canceled";
    +		public static final String FAILED = "failed";
    +		public static final String RESTORED = "restored";
    +		public static final String PENDING_OR_FAILED = "pending_or_failed";
    +		public static final String DISCARDED = "discarded";
    +		public static final String IN_PROGRESS = "in_progress";
    +		public static final String COMPLETED = "completed";
    +
    +		public static final String METRICS = "metrics";
    +		public static final String WRITE_BYTES = "write-bytes";
    +		public static final String READ_BYTES = "read-bytes";
    +		public static final String WRITE_RECORDS = "write-records";
    +		public static final String READ_RECORDS = "read-records";
    +		public static final String TYPE = "type";
    +		public static final String VALUE = "value";
    +
    +		public static final String MIN = "min";
    +		public static final String MAX = "max";
    +		public static final String AVG = "avg";
    +
    +		public static final String JOB_ACCUMULATORS = "job-accumulators";
    +		public static final String USER_ACCUMULATORS = "user-accumulators";
    +		public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
    +		
    +		public static final String COUNTS = "counts";
    +		public static final String EXTERNALIZATION = "externalization";
    +		public static final String EXTERNAL_PATH = "external-path";
    +		public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
    +		public static final String HISTORY = "history";
    +
    +		public static final String SUMMARY = "summary";
    +		public static final String STATE_SIZE = "state_size";
    +		public static final String ETE_DURATION = "end_to_end_duration";
    +		public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
    +		public static final String SAVEPOINT = "savepoint";
    +		public static final String IS_SAVEPOINT = "is_savepoint";
    +		public static final String CHECKPOINT = "checkpoint";
    +		public static final String CHECKPOINT_DURATION = "checkpoint_duration";
    +		public static final String SYNC = "sync";
    +		public static final String ASYNC = "async";
    +		public static final String ALIGNMENT = "alignment";
    +		public static final String BUFFERED = "buffered";
    +		
    +		public static final String LATEST = "latest";
    +		
    +		public static final String FAILURE_TIMESTAMP = "failure_timestamp";
    +		public static final String FAILURE_MESSAGE = "failure_message";
    +		public static final String RESTORE_TIMESTAMP = "restore_timestamp";
    +		
    +		public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
    +		public static final String ACK_TIMESTAMP = "ack_timestamp";
    +		public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +		
    +		public static final String NUM_SUBTASKS = "num_subtasks";
    +		public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +		public static final String INDEX = "index";
    +		public static final String INTERVAL = "interval";
    +		public static final String ENABLED = "enabled";
    +		public static final String TIMEOUT = "timeout";
    +		public static final String MIN_PAUSE = "min_pause";
    +		public static final String MAX_CONCURRENT = "max_concurrent";
    +
    +		private Keys() {
    +		}
    +	}
    +
    +	public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException {
    +		CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis());
    +	}
    +
    +	public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
    +		gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum());
    +		gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum());
    +		gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage());
    +	}
    +
    +	public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
    +		if (state.isTerminal()) {
    +			if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
    +				summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal());
    +				summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote());
    +				summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut());
    +				summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn());
    +				summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut());
    +			}
    +		} else { // execAttempt is still running, use MetricQueryService instead
    +			if (fetcher != null) {
    +				fetcher.update();
    +				MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex);
    +				if (metrics != null) {
    +					summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")));
    +					summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")));
    +					summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")));
    +					summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")));
    +					summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")));
    +				}
    +			}
    +		}
    +	}
    +
    +	public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics) throws IOException {
    --- End diff --
    
    Move this to `MutableIOMetrics`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103433823
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/BuilderUtils.java ---
    @@ -0,0 +1,436 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import org.apache.flink.api.common.ArchivedExecutionConfig;
    +import org.apache.flink.api.common.ExecutionMode;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.MeterView;
    +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecution;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.runtime.util.EvictingBoundedList;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.SerializedValue;
    +
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Random;
    +
    +public class BuilderUtils {
    +	private static final Random RANDOM = new Random();
    +
    +	private BuilderUtils() {
    +	}
    +
    +	public static ArchivedExecutionConfigBuilder createArchivedExecutionConfig() {
    +		return new ArchivedExecutionConfigBuilder();
    +	}
    +
    +	public static class ArchivedExecutionConfigBuilder {
    +		private String executionMode;
    +		private String restartStrategyDescription;
    +		private int parallelism;
    +		private boolean objectReuseEnabled;
    +		private Map<String, String> globalJobParameters;
    +
    +		private ArchivedExecutionConfigBuilder() {
    +		}
    +
    +		public ArchivedExecutionConfigBuilder setExecutionMode(String executionMode) {
    +			this.executionMode = executionMode;
    +			return this;
    +		}
    +
    +		public ArchivedExecutionConfigBuilder setRestartStrategyDescription(String restartStrategyDescription) {
    +			this.restartStrategyDescription = restartStrategyDescription;
    +			return this;
    +		}
    +
    +		public ArchivedExecutionConfigBuilder setParallelism(int parallelism) {
    +			this.parallelism = parallelism;
    +			return this;
    +		}
    +
    +		public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean objectReuseEnabled) {
    +			this.objectReuseEnabled = objectReuseEnabled;
    +			return this;
    +		}
    +
    +		public ArchivedExecutionConfigBuilder setGlobalJobParameters(Map<String, String> globalJobParameters) {
    +			this.globalJobParameters = globalJobParameters;
    +			return this;
    +		}
    +
    +		public ArchivedExecutionConfig finish() {
    --- End diff --
    
    I think the canonical way to create the instance with a builder is to call this `build()` (applies to the other builders as well).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103433357
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    +
    +	@Public
    +	public static final class Keys {
    +		public static final String TASKMANAGERS = "taskmanagers";
    +		public static final String JOB_ID = "jid";
    +		public static final String ID = "id";
    +		public static final String NAME = "name";
    +		public static final String STATE = "state";
    +		public static final String IS_STOPPABLE = "isStoppable";
    +		public static final String PARALLELISM = "parallelism";
    +		public static final String PLAN = "plan";
    +
    +		public static final String START_TIME = "start-time";
    +		public static final String END_TIME = "end-time";
    +		public static final String DURATION = "duration";
    +		public static final String NOW = "now";
    +		public static final String LAST_MODIFICATION = "last-modification";
    +
    +		public static final String TIMESTAMP = "timestamp";
    +		public static final String TIMESTAMPS = "timestamps";
    +		public static final String STATUS_COUNTS = "status-counts";
    +
    +		public static final String REFRESH_INTERVAL = "refresh-interval";
    +		public static final String TIMEZONE_OFFSET = "timezone-offset";
    +		public static final String TIMEZONE_NAME = "timezone-name";
    +		public static final String FLINK_VERSION = "flink-version";
    +		public static final String FLINK_REVISION = "flink-revision";
    +
    +		public static final String EXECUTION_CONFIG = "execution-config";
    +		public static final String MODE = "mode";
    +		public static final String EXECUTION_MODE = "execution-mode";
    +		public static final String RESTART_STRATEGY = "restart-strategy";
    +		public static final String JOB_PARALLELISM = "job-parallelism";
    +		public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
    +		public static final String USER_CONFIG = "user-config";
    +
    +		public static final String ROOT_EXCEPTION = "root-exception";
    +		public static final String ALL_EXCEPTIONS = "all-exceptions";
    +		public static final String EXCEPTION = "exception";
    +		public static final String TRUNCATED = "truncated";
    +
    +		public static final String HOST = "host";
    +		public static final String LOCATION = "location";
    +
    +		public static final String VERTICES = "vertices";
    +		public static final String TASKS = "tasks";
    +		public static final String TASK = "task";
    +		public static final String SUBTASKS = "subtasks";
    +		public static final String SUBTASK = "subtask";
    +		public static final String ATTEMPT = "attempt";
    +
    +		public static final String STATUS = "status";
    +		public static final String TOTAL = "total";
    +		public static final String PENDING = "pending";
    +		public static final String RUNNING = "running";
    +		public static final String FINISHED = "finished";
    +		public static final String CANCELING = "canceling";
    +		public static final String CANCELED = "canceled";
    +		public static final String FAILED = "failed";
    +		public static final String RESTORED = "restored";
    +		public static final String PENDING_OR_FAILED = "pending_or_failed";
    +		public static final String DISCARDED = "discarded";
    +		public static final String IN_PROGRESS = "in_progress";
    +		public static final String COMPLETED = "completed";
    +
    +		public static final String METRICS = "metrics";
    +		public static final String WRITE_BYTES = "write-bytes";
    +		public static final String READ_BYTES = "read-bytes";
    +		public static final String WRITE_RECORDS = "write-records";
    +		public static final String READ_RECORDS = "read-records";
    +		public static final String TYPE = "type";
    +		public static final String VALUE = "value";
    +
    +		public static final String MIN = "min";
    +		public static final String MAX = "max";
    +		public static final String AVG = "avg";
    +
    +		public static final String JOB_ACCUMULATORS = "job-accumulators";
    +		public static final String USER_ACCUMULATORS = "user-accumulators";
    +		public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
    +		
    +		public static final String COUNTS = "counts";
    +		public static final String EXTERNALIZATION = "externalization";
    +		public static final String EXTERNAL_PATH = "external-path";
    +		public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
    +		public static final String HISTORY = "history";
    +
    +		public static final String SUMMARY = "summary";
    +		public static final String STATE_SIZE = "state_size";
    +		public static final String ETE_DURATION = "end_to_end_duration";
    +		public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
    +		public static final String SAVEPOINT = "savepoint";
    +		public static final String IS_SAVEPOINT = "is_savepoint";
    +		public static final String CHECKPOINT = "checkpoint";
    +		public static final String CHECKPOINT_DURATION = "checkpoint_duration";
    +		public static final String SYNC = "sync";
    +		public static final String ASYNC = "async";
    +		public static final String ALIGNMENT = "alignment";
    +		public static final String BUFFERED = "buffered";
    +		
    +		public static final String LATEST = "latest";
    +		
    +		public static final String FAILURE_TIMESTAMP = "failure_timestamp";
    +		public static final String FAILURE_MESSAGE = "failure_message";
    +		public static final String RESTORE_TIMESTAMP = "restore_timestamp";
    +		
    +		public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
    +		public static final String ACK_TIMESTAMP = "ack_timestamp";
    +		public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +		
    +		public static final String NUM_SUBTASKS = "num_subtasks";
    +		public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +		public static final String INDEX = "index";
    +		public static final String INTERVAL = "interval";
    +		public static final String ENABLED = "enabled";
    +		public static final String TIMEOUT = "timeout";
    +		public static final String MIN_PAUSE = "min_pause";
    +		public static final String MAX_CONCURRENT = "max_concurrent";
    +
    +		private Keys() {
    +		}
    +	}
    +
    +	public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException {
    +		CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis());
    +	}
    +
    +	public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
    +		gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum());
    +		gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum());
    +		gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage());
    +	}
    +
    +	public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
    +		if (state.isTerminal()) {
    +			if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
    +				summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal());
    +				summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote());
    +				summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut());
    +				summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn());
    +				summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut());
    +			}
    +		} else { // execAttempt is still running, use MetricQueryService instead
    +			if (fetcher != null) {
    +				fetcher.update();
    +				MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex);
    +				if (metrics != null) {
    +					summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")));
    +					summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")));
    +					summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")));
    +					summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")));
    +					summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")));
    +				}
    +			}
    +		}
    +	}
    +
    +	public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics) throws IOException {
    +		gen.writeObjectFieldStart(Keys.METRICS);
    +		gen.writeNumberField(Keys.READ_BYTES, metrics.getNumBytesInLocal() + metrics.getNumBytesInRemote());
    +		gen.writeNumberField(Keys.WRITE_BYTES, metrics.getNumBytesOut());
    +		gen.writeNumberField(Keys.READ_RECORDS, metrics.getNumRecordsIn());
    +		gen.writeNumberField(Keys.WRITE_RECORDS, metrics.getNumRecordsOut());
    +		gen.writeEndObject();
    +	}
    +
    +	public static class MutableIOMetrics extends IOMetrics {
    --- End diff --
    
    Let's probably also rename this to make clear that we use it only for the web frontend? Having the other methods in the class doesn't fit well with the name any more :smile:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103461677
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java ---
    @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
     		}
     	}
     
    +	public ArchivedExecutionConfig(
    --- End diff --
    
    To change this i would have to touch the runtime Execution[[Job]Vertex] classes which i would rather not do in this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103429562
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java ---
    @@ -36,20 +38,23 @@ public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
     
     	@Override
     	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
    -		StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
    -		
    +		return createVertexAccumulatorsJson(jobVertex);
    +	}
    +	public static String createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException {
    --- End diff --
    
    Empty line missing before method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103434413
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/JsonTestUtils.java ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonFactory;
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ArrayNode;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecution;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecution;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +
    +import java.net.InetAddress;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +public class JsonTestUtils {
    +	public static final ObjectMapper mapper = new ObjectMapper();
    +	public static final JsonFactory jacksonFactory = new JsonFactory()
    +		.enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
    +		.disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);
    +
    +	private static ArchivedExecutionGraph originalJob;
    +	private static ArchivedExecutionJobVertex originalTask;
    +	private static ArchivedExecutionVertex originalSubtask;
    +	private static ArchivedExecution originalAttempt;
    +
    +	private static final Object lock = new Object();
    +
    +	private JsonTestUtils() {
    +	}
    +
    +	public static AccessExecutionGraph getTestJob() throws Exception {
    +		synchronized (lock) {
    +			if (originalJob == null) {
    +				generateObjects();
    +			}
    +		}
    +		return originalJob;
    +	}
    +
    +	public static AccessExecutionJobVertex getTestTask() throws Exception {
    +		synchronized (lock) {
    +			if (originalJob == null) {
    +				generateObjects();
    +			}
    +		}
    +		return originalTask;
    +	}
    +
    +	public static AccessExecutionVertex getTestSubtask() throws Exception {
    +		synchronized (lock) {
    +			if (originalJob == null) {
    +				generateObjects();
    +			}
    +		}
    +		return originalSubtask;
    +	}
    +
    +	public static AccessExecution getTestAttempt() throws Exception {
    +		synchronized (lock) {
    +			if (originalJob == null) {
    +				generateObjects();
    +			}
    +		}
    +		return originalAttempt;
    +	}
    +
    +	private static void generateObjects() throws Exception {
    --- End diff --
    
    Very generic name that should have a comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3365: [FLINK-5852] Move handler JSON generation code into stati...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3365
  
    @uce I've removed the ```JsonUtils``` class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103473529
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java ---
    @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
     		}
     	}
     
    +	public ArchivedExecutionConfig(
    --- End diff --
    
    What do you mean? The other constructor is only used once if IntelliJ is to be trusted in `ExecutionConfig.archive`. But I'm fine with leaving it as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103430333
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    --- End diff --
    
    Why is this not simply kept in the respective handler where it is relevant?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103431060
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    +
    +	@Public
    +	public static final class Keys {
    +		public static final String TASKMANAGERS = "taskmanagers";
    +		public static final String JOB_ID = "jid";
    +		public static final String ID = "id";
    +		public static final String NAME = "name";
    +		public static final String STATE = "state";
    +		public static final String IS_STOPPABLE = "isStoppable";
    +		public static final String PARALLELISM = "parallelism";
    +		public static final String PLAN = "plan";
    +
    +		public static final String START_TIME = "start-time";
    +		public static final String END_TIME = "end-time";
    +		public static final String DURATION = "duration";
    +		public static final String NOW = "now";
    +		public static final String LAST_MODIFICATION = "last-modification";
    +
    +		public static final String TIMESTAMP = "timestamp";
    +		public static final String TIMESTAMPS = "timestamps";
    +		public static final String STATUS_COUNTS = "status-counts";
    +
    +		public static final String REFRESH_INTERVAL = "refresh-interval";
    +		public static final String TIMEZONE_OFFSET = "timezone-offset";
    +		public static final String TIMEZONE_NAME = "timezone-name";
    +		public static final String FLINK_VERSION = "flink-version";
    +		public static final String FLINK_REVISION = "flink-revision";
    +
    +		public static final String EXECUTION_CONFIG = "execution-config";
    +		public static final String MODE = "mode";
    +		public static final String EXECUTION_MODE = "execution-mode";
    +		public static final String RESTART_STRATEGY = "restart-strategy";
    +		public static final String JOB_PARALLELISM = "job-parallelism";
    +		public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
    +		public static final String USER_CONFIG = "user-config";
    +
    +		public static final String ROOT_EXCEPTION = "root-exception";
    +		public static final String ALL_EXCEPTIONS = "all-exceptions";
    +		public static final String EXCEPTION = "exception";
    +		public static final String TRUNCATED = "truncated";
    +
    +		public static final String HOST = "host";
    +		public static final String LOCATION = "location";
    +
    +		public static final String VERTICES = "vertices";
    +		public static final String TASKS = "tasks";
    +		public static final String TASK = "task";
    +		public static final String SUBTASKS = "subtasks";
    +		public static final String SUBTASK = "subtask";
    +		public static final String ATTEMPT = "attempt";
    +
    +		public static final String STATUS = "status";
    +		public static final String TOTAL = "total";
    +		public static final String PENDING = "pending";
    +		public static final String RUNNING = "running";
    +		public static final String FINISHED = "finished";
    +		public static final String CANCELING = "canceling";
    +		public static final String CANCELED = "canceled";
    +		public static final String FAILED = "failed";
    +		public static final String RESTORED = "restored";
    +		public static final String PENDING_OR_FAILED = "pending_or_failed";
    +		public static final String DISCARDED = "discarded";
    +		public static final String IN_PROGRESS = "in_progress";
    +		public static final String COMPLETED = "completed";
    +
    +		public static final String METRICS = "metrics";
    +		public static final String WRITE_BYTES = "write-bytes";
    +		public static final String READ_BYTES = "read-bytes";
    +		public static final String WRITE_RECORDS = "write-records";
    +		public static final String READ_RECORDS = "read-records";
    +		public static final String TYPE = "type";
    +		public static final String VALUE = "value";
    +
    +		public static final String MIN = "min";
    +		public static final String MAX = "max";
    +		public static final String AVG = "avg";
    +
    +		public static final String JOB_ACCUMULATORS = "job-accumulators";
    +		public static final String USER_ACCUMULATORS = "user-accumulators";
    +		public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
    +		
    +		public static final String COUNTS = "counts";
    +		public static final String EXTERNALIZATION = "externalization";
    +		public static final String EXTERNAL_PATH = "external-path";
    +		public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
    +		public static final String HISTORY = "history";
    +
    +		public static final String SUMMARY = "summary";
    +		public static final String STATE_SIZE = "state_size";
    +		public static final String ETE_DURATION = "end_to_end_duration";
    +		public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
    +		public static final String SAVEPOINT = "savepoint";
    +		public static final String IS_SAVEPOINT = "is_savepoint";
    +		public static final String CHECKPOINT = "checkpoint";
    +		public static final String CHECKPOINT_DURATION = "checkpoint_duration";
    +		public static final String SYNC = "sync";
    +		public static final String ASYNC = "async";
    +		public static final String ALIGNMENT = "alignment";
    +		public static final String BUFFERED = "buffered";
    +		
    +		public static final String LATEST = "latest";
    +		
    +		public static final String FAILURE_TIMESTAMP = "failure_timestamp";
    +		public static final String FAILURE_MESSAGE = "failure_message";
    +		public static final String RESTORE_TIMESTAMP = "restore_timestamp";
    +		
    +		public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
    +		public static final String ACK_TIMESTAMP = "ack_timestamp";
    +		public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +		
    +		public static final String NUM_SUBTASKS = "num_subtasks";
    +		public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +		public static final String INDEX = "index";
    +		public static final String INTERVAL = "interval";
    +		public static final String ENABLED = "enabled";
    +		public static final String TIMEOUT = "timeout";
    +		public static final String MIN_PAUSE = "min_pause";
    +		public static final String MAX_CONCURRENT = "max_concurrent";
    +
    +		private Keys() {
    +		}
    +	}
    +
    +	public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException {
    +		CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis());
    +	}
    +
    +	public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
    --- End diff --
    
    I would move this back to the respective handler package and make it package private or public there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103425747
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java ---
    @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
     		}
     	}
     
    +	public ArchivedExecutionConfig(
    --- End diff --
    
    I think this constructor should be the only one. The other `ArchivedExecutionConfig(ExecutionConfig)` constructor should become a static helper method, e.g. `static ArchivedExecutionConfig createFromExecutionConfig(ExecutionConfig)`, that calls the constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3365


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103425958
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java ---
    @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
     		}
     	}
     
    +	public ArchivedExecutionConfig(
    +			String executionMode,
    +			String restartStrategyDescription,
    +			int parallelism,
    +			boolean objectReuseEnabled,
    +			Map<String, String> globalJobParameters) {
    +		this.executionMode = executionMode;
    --- End diff --
    
    Let's add `checkNotNull` checks where applicable and `checkArgument(parallelism >= 1)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3365: [FLINK-5852] Move handler JSON generation code into stati...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3365
  
    @uce Thank your for the review. I've addressed all issues that you raised, either with further comments or changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103453996
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java ---
    @@ -41,19 +43,23 @@ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
     
     	@Override
     	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
    +		return createJobExceptionsJson(graph);
    +	}
    +
    +	public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
     		StringWriter writer = new StringWriter();
     		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
     
     		gen.writeStartObject();
     		
     		// most important is the root failure cause
     		String rootException = graph.getFailureCauseAsString();
    -		if (rootException != null) {
    -			gen.writeStringField("root-exception", rootException);
    +		if (!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
    --- End diff --
    
    we could have both, but ExecutionGraph#archive() guarantees a non-null value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103464724
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    +
    +	@Public
    +	public static final class Keys {
    --- End diff --
    
    Then let's discuss it now before i go through every handler&test again.
    
    The REST API is loaded with inconsistencies in regards to how things are structured and named. I believe this is in part due to not having a central place where keywords can be stored.
    
    If every handler rolls their own and lives in a vacuum then an inconsistent API is inevitable. However, with this class, a handler that doesn't use it sticks out compared to other handlers. My hope is this is easier to notice during reviews, leading the contributor to the JsonUtils class and possible reusing keywords instead of adding another set.
    
    I do agree that readability suffers a bit when comparing ```JsonUtils.Keys.STATUS``` to ```"status"```, but i frankly i think we can refactor the JsonUtils/usage to allow ```STATUS```. And with this i don't see an issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103431005
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    +
    +	@Public
    +	public static final class Keys {
    +		public static final String TASKMANAGERS = "taskmanagers";
    +		public static final String JOB_ID = "jid";
    +		public static final String ID = "id";
    +		public static final String NAME = "name";
    +		public static final String STATE = "state";
    +		public static final String IS_STOPPABLE = "isStoppable";
    +		public static final String PARALLELISM = "parallelism";
    +		public static final String PLAN = "plan";
    +
    +		public static final String START_TIME = "start-time";
    +		public static final String END_TIME = "end-time";
    +		public static final String DURATION = "duration";
    +		public static final String NOW = "now";
    +		public static final String LAST_MODIFICATION = "last-modification";
    +
    +		public static final String TIMESTAMP = "timestamp";
    +		public static final String TIMESTAMPS = "timestamps";
    +		public static final String STATUS_COUNTS = "status-counts";
    +
    +		public static final String REFRESH_INTERVAL = "refresh-interval";
    +		public static final String TIMEZONE_OFFSET = "timezone-offset";
    +		public static final String TIMEZONE_NAME = "timezone-name";
    +		public static final String FLINK_VERSION = "flink-version";
    +		public static final String FLINK_REVISION = "flink-revision";
    +
    +		public static final String EXECUTION_CONFIG = "execution-config";
    +		public static final String MODE = "mode";
    +		public static final String EXECUTION_MODE = "execution-mode";
    +		public static final String RESTART_STRATEGY = "restart-strategy";
    +		public static final String JOB_PARALLELISM = "job-parallelism";
    +		public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
    +		public static final String USER_CONFIG = "user-config";
    +
    +		public static final String ROOT_EXCEPTION = "root-exception";
    +		public static final String ALL_EXCEPTIONS = "all-exceptions";
    +		public static final String EXCEPTION = "exception";
    +		public static final String TRUNCATED = "truncated";
    +
    +		public static final String HOST = "host";
    +		public static final String LOCATION = "location";
    +
    +		public static final String VERTICES = "vertices";
    +		public static final String TASKS = "tasks";
    +		public static final String TASK = "task";
    +		public static final String SUBTASKS = "subtasks";
    +		public static final String SUBTASK = "subtask";
    +		public static final String ATTEMPT = "attempt";
    +
    +		public static final String STATUS = "status";
    +		public static final String TOTAL = "total";
    +		public static final String PENDING = "pending";
    +		public static final String RUNNING = "running";
    +		public static final String FINISHED = "finished";
    +		public static final String CANCELING = "canceling";
    +		public static final String CANCELED = "canceled";
    +		public static final String FAILED = "failed";
    +		public static final String RESTORED = "restored";
    +		public static final String PENDING_OR_FAILED = "pending_or_failed";
    +		public static final String DISCARDED = "discarded";
    +		public static final String IN_PROGRESS = "in_progress";
    +		public static final String COMPLETED = "completed";
    +
    +		public static final String METRICS = "metrics";
    +		public static final String WRITE_BYTES = "write-bytes";
    +		public static final String READ_BYTES = "read-bytes";
    +		public static final String WRITE_RECORDS = "write-records";
    +		public static final String READ_RECORDS = "read-records";
    +		public static final String TYPE = "type";
    +		public static final String VALUE = "value";
    +
    +		public static final String MIN = "min";
    +		public static final String MAX = "max";
    +		public static final String AVG = "avg";
    +
    +		public static final String JOB_ACCUMULATORS = "job-accumulators";
    +		public static final String USER_ACCUMULATORS = "user-accumulators";
    +		public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
    +		
    +		public static final String COUNTS = "counts";
    +		public static final String EXTERNALIZATION = "externalization";
    +		public static final String EXTERNAL_PATH = "external-path";
    +		public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
    +		public static final String HISTORY = "history";
    +
    +		public static final String SUMMARY = "summary";
    +		public static final String STATE_SIZE = "state_size";
    +		public static final String ETE_DURATION = "end_to_end_duration";
    +		public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
    +		public static final String SAVEPOINT = "savepoint";
    +		public static final String IS_SAVEPOINT = "is_savepoint";
    +		public static final String CHECKPOINT = "checkpoint";
    +		public static final String CHECKPOINT_DURATION = "checkpoint_duration";
    +		public static final String SYNC = "sync";
    +		public static final String ASYNC = "async";
    +		public static final String ALIGNMENT = "alignment";
    +		public static final String BUFFERED = "buffered";
    +		
    +		public static final String LATEST = "latest";
    +		
    +		public static final String FAILURE_TIMESTAMP = "failure_timestamp";
    +		public static final String FAILURE_MESSAGE = "failure_message";
    +		public static final String RESTORE_TIMESTAMP = "restore_timestamp";
    +		
    +		public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
    +		public static final String ACK_TIMESTAMP = "ack_timestamp";
    +		public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +		
    +		public static final String NUM_SUBTASKS = "num_subtasks";
    +		public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +		public static final String INDEX = "index";
    +		public static final String INTERVAL = "interval";
    +		public static final String ENABLED = "enabled";
    +		public static final String TIMEOUT = "timeout";
    +		public static final String MIN_PAUSE = "min_pause";
    +		public static final String MAX_CONCURRENT = "max_concurrent";
    +
    +		private Keys() {
    +		}
    +	}
    +
    +	public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException {
    --- End diff --
    
    Unused


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103463182
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java ---
    @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
     		}
     	}
     
    +	public ArchivedExecutionConfig(
    +			String executionMode,
    +			String restartStrategyDescription,
    +			int parallelism,
    +			boolean objectReuseEnabled,
    +			Map<String, String> globalJobParameters) {
    +		this.executionMode = executionMode;
    --- End diff --
    
    That would lead to maintainability issues. The ArchivedExecutionConfigs very purpose is to truthfully represent some state X. In order to do so it must only impose restrictions that the original object imposed, i.e the ExecutionConfig, otherwise we risk that an ExecutionConfig can't be archived which doesn't make sense. This in turn would require that any change to the conditions that the EC imposes must be propagated to the ArchviedEC, which simply won't happen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103479406
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java ---
    @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
     		}
     	}
     
    +	public ArchivedExecutionConfig(
    +			String executionMode,
    +			String restartStrategyDescription,
    +			int parallelism,
    +			boolean objectReuseEnabled,
    +			Map<String, String> globalJobParameters) {
    +		this.executionMode = executionMode;
    --- End diff --
    
    The checks will only throw exceptions if the current behaviour changes, most probably because of an unintended change. Anyways, feel free to leave it as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103425168
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java ---
    @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
     		}
     	}
     
    +	public ArchivedExecutionConfig(
    --- End diff --
    
    Class is missing `serialVersionUID`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103429470
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java ---
    @@ -41,19 +43,23 @@ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
     
     	@Override
     	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
    +		return createJobExceptionsJson(graph);
    +	}
    +
    +	public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
     		StringWriter writer = new StringWriter();
     		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
     
     		gen.writeStartObject();
     		
     		// most important is the root failure cause
     		String rootException = graph.getFailureCauseAsString();
    -		if (rootException != null) {
    -			gen.writeStringField("root-exception", rootException);
    +		if (!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
    --- End diff --
    
    Should we have both?
    ```java
    if (rootException != null && !rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION))?
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103434202
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/JsonTestUtils.java ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonFactory;
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ArrayNode;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecution;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecution;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +
    +import java.net.InetAddress;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +public class JsonTestUtils {
    --- End diff --
    
    Could please add comments? It looks like this is a very specific utility for job specific tests. I think this should be easier to get than checking where it is used etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103430940
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    +
    +	@Public
    +	public static final class Keys {
    --- End diff --
    
    If we want to make this generic we have to go another way I think. This is still very manual. I'm happy to discuss how else we could do it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103502713
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java ---
    @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
     		}
     	}
     
    +	public ArchivedExecutionConfig(
    --- End diff --
    
    I wasn't mixing things up. For example, in ```Execution#archive()``` we create an ```ArchivedExecution``` through the use of the ```ArchivedExecution(Execution)``` constructor. This pattern is identical for all Archived* classes except the EG (because we need some internal components and dont want to overload the class even more).
    
    Changing the constructor implies changing the ExecutionConfig, which imo is out of scope of this PR. Also, to be consistent, we would also have to touch other ```Archivable``` classes as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103431495
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    +
    +	@Public
    +	public static final class Keys {
    +		public static final String TASKMANAGERS = "taskmanagers";
    +		public static final String JOB_ID = "jid";
    +		public static final String ID = "id";
    +		public static final String NAME = "name";
    +		public static final String STATE = "state";
    +		public static final String IS_STOPPABLE = "isStoppable";
    +		public static final String PARALLELISM = "parallelism";
    +		public static final String PLAN = "plan";
    +
    +		public static final String START_TIME = "start-time";
    +		public static final String END_TIME = "end-time";
    +		public static final String DURATION = "duration";
    +		public static final String NOW = "now";
    +		public static final String LAST_MODIFICATION = "last-modification";
    +
    +		public static final String TIMESTAMP = "timestamp";
    +		public static final String TIMESTAMPS = "timestamps";
    +		public static final String STATUS_COUNTS = "status-counts";
    +
    +		public static final String REFRESH_INTERVAL = "refresh-interval";
    +		public static final String TIMEZONE_OFFSET = "timezone-offset";
    +		public static final String TIMEZONE_NAME = "timezone-name";
    +		public static final String FLINK_VERSION = "flink-version";
    +		public static final String FLINK_REVISION = "flink-revision";
    +
    +		public static final String EXECUTION_CONFIG = "execution-config";
    +		public static final String MODE = "mode";
    +		public static final String EXECUTION_MODE = "execution-mode";
    +		public static final String RESTART_STRATEGY = "restart-strategy";
    +		public static final String JOB_PARALLELISM = "job-parallelism";
    +		public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
    +		public static final String USER_CONFIG = "user-config";
    +
    +		public static final String ROOT_EXCEPTION = "root-exception";
    +		public static final String ALL_EXCEPTIONS = "all-exceptions";
    +		public static final String EXCEPTION = "exception";
    +		public static final String TRUNCATED = "truncated";
    +
    +		public static final String HOST = "host";
    +		public static final String LOCATION = "location";
    +
    +		public static final String VERTICES = "vertices";
    +		public static final String TASKS = "tasks";
    +		public static final String TASK = "task";
    +		public static final String SUBTASKS = "subtasks";
    +		public static final String SUBTASK = "subtask";
    +		public static final String ATTEMPT = "attempt";
    +
    +		public static final String STATUS = "status";
    +		public static final String TOTAL = "total";
    +		public static final String PENDING = "pending";
    +		public static final String RUNNING = "running";
    +		public static final String FINISHED = "finished";
    +		public static final String CANCELING = "canceling";
    +		public static final String CANCELED = "canceled";
    +		public static final String FAILED = "failed";
    +		public static final String RESTORED = "restored";
    +		public static final String PENDING_OR_FAILED = "pending_or_failed";
    +		public static final String DISCARDED = "discarded";
    +		public static final String IN_PROGRESS = "in_progress";
    +		public static final String COMPLETED = "completed";
    +
    +		public static final String METRICS = "metrics";
    +		public static final String WRITE_BYTES = "write-bytes";
    +		public static final String READ_BYTES = "read-bytes";
    +		public static final String WRITE_RECORDS = "write-records";
    +		public static final String READ_RECORDS = "read-records";
    +		public static final String TYPE = "type";
    +		public static final String VALUE = "value";
    +
    +		public static final String MIN = "min";
    +		public static final String MAX = "max";
    +		public static final String AVG = "avg";
    +
    +		public static final String JOB_ACCUMULATORS = "job-accumulators";
    +		public static final String USER_ACCUMULATORS = "user-accumulators";
    +		public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
    +		
    +		public static final String COUNTS = "counts";
    +		public static final String EXTERNALIZATION = "externalization";
    +		public static final String EXTERNAL_PATH = "external-path";
    +		public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
    +		public static final String HISTORY = "history";
    +
    +		public static final String SUMMARY = "summary";
    +		public static final String STATE_SIZE = "state_size";
    +		public static final String ETE_DURATION = "end_to_end_duration";
    +		public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
    +		public static final String SAVEPOINT = "savepoint";
    +		public static final String IS_SAVEPOINT = "is_savepoint";
    +		public static final String CHECKPOINT = "checkpoint";
    +		public static final String CHECKPOINT_DURATION = "checkpoint_duration";
    +		public static final String SYNC = "sync";
    +		public static final String ASYNC = "async";
    +		public static final String ALIGNMENT = "alignment";
    +		public static final String BUFFERED = "buffered";
    +		
    +		public static final String LATEST = "latest";
    +		
    +		public static final String FAILURE_TIMESTAMP = "failure_timestamp";
    +		public static final String FAILURE_MESSAGE = "failure_message";
    +		public static final String RESTORE_TIMESTAMP = "restore_timestamp";
    +		
    +		public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
    +		public static final String ACK_TIMESTAMP = "ack_timestamp";
    +		public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +		
    +		public static final String NUM_SUBTASKS = "num_subtasks";
    +		public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +		public static final String INDEX = "index";
    +		public static final String INTERVAL = "interval";
    +		public static final String ENABLED = "enabled";
    +		public static final String TIMEOUT = "timeout";
    +		public static final String MIN_PAUSE = "min_pause";
    +		public static final String MAX_CONCURRENT = "max_concurrent";
    +
    +		private Keys() {
    +		}
    +	}
    +
    +	public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException {
    +		CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis());
    +	}
    +
    +	public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
    +		gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum());
    +		gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum());
    +		gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage());
    +	}
    +
    +	public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
    +		if (state.isTerminal()) {
    +			if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
    +				summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal());
    +				summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote());
    +				summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut());
    +				summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn());
    +				summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut());
    +			}
    +		} else { // execAttempt is still running, use MetricQueryService instead
    +			if (fetcher != null) {
    +				fetcher.update();
    +				MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex);
    +				if (metrics != null) {
    +					summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")));
    +					summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")));
    +					summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")));
    +					summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")));
    +					summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")));
    +				}
    +			}
    +		}
    +	}
    +
    +	public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics) throws IOException {
    +		gen.writeObjectFieldStart(Keys.METRICS);
    +		gen.writeNumberField(Keys.READ_BYTES, metrics.getNumBytesInLocal() + metrics.getNumBytesInRemote());
    +		gen.writeNumberField(Keys.WRITE_BYTES, metrics.getNumBytesOut());
    +		gen.writeNumberField(Keys.READ_RECORDS, metrics.getNumRecordsIn());
    +		gen.writeNumberField(Keys.WRITE_RECORDS, metrics.getNumRecordsOut());
    +		gen.writeEndObject();
    +	}
    +
    +	public static class MutableIOMetrics extends IOMetrics {
    --- End diff --
    
    Missing `serialVersionUID`. I would move all the related metrics utilities to this class. Please, also add comments to the class. I think it's OK to make this class a top level class in the ` org.apache.flink.runtime.webmonitor.utils` package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103477920
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java ---
    @@ -41,19 +43,23 @@ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
     
     	@Override
     	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
    +		return createJobExceptionsJson(graph);
    +	}
    +
    +	public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
     		StringWriter writer = new StringWriter();
     		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
     
     		gen.writeStartObject();
     		
     		// most important is the root failure cause
     		String rootException = graph.getFailureCauseAsString();
    -		if (rootException != null) {
    -			gen.writeStringField("root-exception", rootException);
    +		if (!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
    --- End diff --
    
    that's reasonable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103710293
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class MutableIOMetrics extends IOMetrics {
    --- End diff --
    
    Could you write a high level comment why we have this as a utility for the web frontend?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103475734
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    +
    +	@Public
    +	public static final class Keys {
    --- End diff --
    
    I think the way to do this is completely getting rid of the manual JSON creation, but I don't see that we are going to do this any time soon. A majority of the keys is either only used in a single handler (+ test) or simply identical to the String that it refers to. I understand that is sucks to go through every handler and test again, but this is an unrelated change to begin with. I agree that there can be a problem with inconsistencies etc., but I don't see how this solves it (except for what you said with hoping to make it easier to catch). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103454740
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    --- End diff --
    
    remnant of the time when all JSON was generated here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103477647
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java ---
    @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
     		}
     	}
     
    +	public ArchivedExecutionConfig(
    --- End diff --
    
    Sorry, i was mixing things up. I thought i would have to change the constructors of other Archived* classes as well to be consistent, but they already had a constructor like this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3365: [FLINK-5852] Move handler JSON generation code into stati...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/3365
  
    Thanks for addressing the comments. \U0001f44d 
    
    I had two last trivial inline questions. Could you go ahead an merge this and decide whether it makes sense to address the questions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103456991
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    +
    +	@Public
    +	public static final class Keys {
    +		public static final String TASKMANAGERS = "taskmanagers";
    +		public static final String JOB_ID = "jid";
    +		public static final String ID = "id";
    +		public static final String NAME = "name";
    +		public static final String STATE = "state";
    +		public static final String IS_STOPPABLE = "isStoppable";
    +		public static final String PARALLELISM = "parallelism";
    +		public static final String PLAN = "plan";
    +
    +		public static final String START_TIME = "start-time";
    +		public static final String END_TIME = "end-time";
    +		public static final String DURATION = "duration";
    +		public static final String NOW = "now";
    +		public static final String LAST_MODIFICATION = "last-modification";
    +
    +		public static final String TIMESTAMP = "timestamp";
    +		public static final String TIMESTAMPS = "timestamps";
    +		public static final String STATUS_COUNTS = "status-counts";
    +
    +		public static final String REFRESH_INTERVAL = "refresh-interval";
    +		public static final String TIMEZONE_OFFSET = "timezone-offset";
    +		public static final String TIMEZONE_NAME = "timezone-name";
    +		public static final String FLINK_VERSION = "flink-version";
    +		public static final String FLINK_REVISION = "flink-revision";
    +
    +		public static final String EXECUTION_CONFIG = "execution-config";
    +		public static final String MODE = "mode";
    +		public static final String EXECUTION_MODE = "execution-mode";
    +		public static final String RESTART_STRATEGY = "restart-strategy";
    +		public static final String JOB_PARALLELISM = "job-parallelism";
    +		public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
    +		public static final String USER_CONFIG = "user-config";
    +
    +		public static final String ROOT_EXCEPTION = "root-exception";
    +		public static final String ALL_EXCEPTIONS = "all-exceptions";
    +		public static final String EXCEPTION = "exception";
    +		public static final String TRUNCATED = "truncated";
    +
    +		public static final String HOST = "host";
    +		public static final String LOCATION = "location";
    +
    +		public static final String VERTICES = "vertices";
    +		public static final String TASKS = "tasks";
    +		public static final String TASK = "task";
    +		public static final String SUBTASKS = "subtasks";
    +		public static final String SUBTASK = "subtask";
    +		public static final String ATTEMPT = "attempt";
    +
    +		public static final String STATUS = "status";
    +		public static final String TOTAL = "total";
    +		public static final String PENDING = "pending";
    +		public static final String RUNNING = "running";
    +		public static final String FINISHED = "finished";
    +		public static final String CANCELING = "canceling";
    +		public static final String CANCELED = "canceled";
    +		public static final String FAILED = "failed";
    +		public static final String RESTORED = "restored";
    +		public static final String PENDING_OR_FAILED = "pending_or_failed";
    +		public static final String DISCARDED = "discarded";
    +		public static final String IN_PROGRESS = "in_progress";
    +		public static final String COMPLETED = "completed";
    +
    +		public static final String METRICS = "metrics";
    +		public static final String WRITE_BYTES = "write-bytes";
    +		public static final String READ_BYTES = "read-bytes";
    +		public static final String WRITE_RECORDS = "write-records";
    +		public static final String READ_RECORDS = "read-records";
    +		public static final String TYPE = "type";
    +		public static final String VALUE = "value";
    +
    +		public static final String MIN = "min";
    +		public static final String MAX = "max";
    +		public static final String AVG = "avg";
    +
    +		public static final String JOB_ACCUMULATORS = "job-accumulators";
    +		public static final String USER_ACCUMULATORS = "user-accumulators";
    +		public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
    +		
    +		public static final String COUNTS = "counts";
    +		public static final String EXTERNALIZATION = "externalization";
    +		public static final String EXTERNAL_PATH = "external-path";
    +		public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
    +		public static final String HISTORY = "history";
    +
    +		public static final String SUMMARY = "summary";
    +		public static final String STATE_SIZE = "state_size";
    +		public static final String ETE_DURATION = "end_to_end_duration";
    +		public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
    +		public static final String SAVEPOINT = "savepoint";
    +		public static final String IS_SAVEPOINT = "is_savepoint";
    +		public static final String CHECKPOINT = "checkpoint";
    +		public static final String CHECKPOINT_DURATION = "checkpoint_duration";
    +		public static final String SYNC = "sync";
    +		public static final String ASYNC = "async";
    +		public static final String ALIGNMENT = "alignment";
    +		public static final String BUFFERED = "buffered";
    +		
    +		public static final String LATEST = "latest";
    +		
    +		public static final String FAILURE_TIMESTAMP = "failure_timestamp";
    +		public static final String FAILURE_MESSAGE = "failure_message";
    +		public static final String RESTORE_TIMESTAMP = "restore_timestamp";
    +		
    +		public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
    +		public static final String ACK_TIMESTAMP = "ack_timestamp";
    +		public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +		
    +		public static final String NUM_SUBTASKS = "num_subtasks";
    +		public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +		public static final String INDEX = "index";
    +		public static final String INTERVAL = "interval";
    +		public static final String ENABLED = "enabled";
    +		public static final String TIMEOUT = "timeout";
    +		public static final String MIN_PAUSE = "min_pause";
    +		public static final String MAX_CONCURRENT = "max_concurrent";
    +
    +		private Keys() {
    +		}
    +	}
    +
    +	public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException {
    +		CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis());
    +	}
    +
    +	public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
    +		gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum());
    +		gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum());
    +		gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage());
    +	}
    +
    +	public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
    +		if (state.isTerminal()) {
    +			if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
    +				summedMetrics.addNumBytesInLocal(ioMetrics.getNumBytesInLocal());
    +				summedMetrics.addNumBytesInRemote(ioMetrics.getNumBytesInRemote());
    +				summedMetrics.addNumBytesOut(ioMetrics.getNumBytesOut());
    +				summedMetrics.addNumRecordsIn(ioMetrics.getNumRecordsIn());
    +				summedMetrics.addNumRecordsOut(ioMetrics.getNumRecordsOut());
    +			}
    +		} else { // execAttempt is still running, use MetricQueryService instead
    +			if (fetcher != null) {
    +				fetcher.update();
    +				MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, subtaskIndex);
    +				if (metrics != null) {
    +					summedMetrics.addNumBytesInLocal(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")));
    +					summedMetrics.addNumBytesInRemote(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")));
    +					summedMetrics.addNumBytesOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")));
    +					summedMetrics.addNumRecordsIn(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")));
    +					summedMetrics.addNumRecordsOut(Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")));
    +				}
    +			}
    +		}
    +	}
    +
    +	public static void writeIOMetrics(JsonGenerator gen, IOMetrics metrics) throws IOException {
    +		gen.writeObjectFieldStart(Keys.METRICS);
    +		gen.writeNumberField(Keys.READ_BYTES, metrics.getNumBytesInLocal() + metrics.getNumBytesInRemote());
    +		gen.writeNumberField(Keys.WRITE_BYTES, metrics.getNumBytesOut());
    +		gen.writeNumberField(Keys.READ_RECORDS, metrics.getNumRecordsIn());
    +		gen.writeNumberField(Keys.WRITE_RECORDS, metrics.getNumRecordsOut());
    +		gen.writeEndObject();
    +	}
    +
    +	public static class MutableIOMetrics extends IOMetrics {
    --- End diff --
    
    I think the name is fine; having it in runtime-web already implies that it is only used by the web frontend.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103431622
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    +
    +	@Public
    +	public static final class Keys {
    +		public static final String TASKMANAGERS = "taskmanagers";
    +		public static final String JOB_ID = "jid";
    +		public static final String ID = "id";
    +		public static final String NAME = "name";
    +		public static final String STATE = "state";
    +		public static final String IS_STOPPABLE = "isStoppable";
    +		public static final String PARALLELISM = "parallelism";
    +		public static final String PLAN = "plan";
    +
    +		public static final String START_TIME = "start-time";
    +		public static final String END_TIME = "end-time";
    +		public static final String DURATION = "duration";
    +		public static final String NOW = "now";
    +		public static final String LAST_MODIFICATION = "last-modification";
    +
    +		public static final String TIMESTAMP = "timestamp";
    +		public static final String TIMESTAMPS = "timestamps";
    +		public static final String STATUS_COUNTS = "status-counts";
    +
    +		public static final String REFRESH_INTERVAL = "refresh-interval";
    +		public static final String TIMEZONE_OFFSET = "timezone-offset";
    +		public static final String TIMEZONE_NAME = "timezone-name";
    +		public static final String FLINK_VERSION = "flink-version";
    +		public static final String FLINK_REVISION = "flink-revision";
    +
    +		public static final String EXECUTION_CONFIG = "execution-config";
    +		public static final String MODE = "mode";
    +		public static final String EXECUTION_MODE = "execution-mode";
    +		public static final String RESTART_STRATEGY = "restart-strategy";
    +		public static final String JOB_PARALLELISM = "job-parallelism";
    +		public static final String OBJECT_REUSE_MODE = "object-reuse-mode";
    +		public static final String USER_CONFIG = "user-config";
    +
    +		public static final String ROOT_EXCEPTION = "root-exception";
    +		public static final String ALL_EXCEPTIONS = "all-exceptions";
    +		public static final String EXCEPTION = "exception";
    +		public static final String TRUNCATED = "truncated";
    +
    +		public static final String HOST = "host";
    +		public static final String LOCATION = "location";
    +
    +		public static final String VERTICES = "vertices";
    +		public static final String TASKS = "tasks";
    +		public static final String TASK = "task";
    +		public static final String SUBTASKS = "subtasks";
    +		public static final String SUBTASK = "subtask";
    +		public static final String ATTEMPT = "attempt";
    +
    +		public static final String STATUS = "status";
    +		public static final String TOTAL = "total";
    +		public static final String PENDING = "pending";
    +		public static final String RUNNING = "running";
    +		public static final String FINISHED = "finished";
    +		public static final String CANCELING = "canceling";
    +		public static final String CANCELED = "canceled";
    +		public static final String FAILED = "failed";
    +		public static final String RESTORED = "restored";
    +		public static final String PENDING_OR_FAILED = "pending_or_failed";
    +		public static final String DISCARDED = "discarded";
    +		public static final String IN_PROGRESS = "in_progress";
    +		public static final String COMPLETED = "completed";
    +
    +		public static final String METRICS = "metrics";
    +		public static final String WRITE_BYTES = "write-bytes";
    +		public static final String READ_BYTES = "read-bytes";
    +		public static final String WRITE_RECORDS = "write-records";
    +		public static final String READ_RECORDS = "read-records";
    +		public static final String TYPE = "type";
    +		public static final String VALUE = "value";
    +
    +		public static final String MIN = "min";
    +		public static final String MAX = "max";
    +		public static final String AVG = "avg";
    +
    +		public static final String JOB_ACCUMULATORS = "job-accumulators";
    +		public static final String USER_ACCUMULATORS = "user-accumulators";
    +		public static final String USER_TASK_ACCUMULATORS = "user-task-accumulators";
    +		
    +		public static final String COUNTS = "counts";
    +		public static final String EXTERNALIZATION = "externalization";
    +		public static final String EXTERNAL_PATH = "external-path";
    +		public static final String DELETE_ON_CANCEL = "delete_on_cancellation";
    +		public static final String HISTORY = "history";
    +
    +		public static final String SUMMARY = "summary";
    +		public static final String STATE_SIZE = "state_size";
    +		public static final String ETE_DURATION = "end_to_end_duration";
    +		public static final String ALIGNMENT_BUFFERED = "alignment_buffered";
    +		public static final String SAVEPOINT = "savepoint";
    +		public static final String IS_SAVEPOINT = "is_savepoint";
    +		public static final String CHECKPOINT = "checkpoint";
    +		public static final String CHECKPOINT_DURATION = "checkpoint_duration";
    +		public static final String SYNC = "sync";
    +		public static final String ASYNC = "async";
    +		public static final String ALIGNMENT = "alignment";
    +		public static final String BUFFERED = "buffered";
    +		
    +		public static final String LATEST = "latest";
    +		
    +		public static final String FAILURE_TIMESTAMP = "failure_timestamp";
    +		public static final String FAILURE_MESSAGE = "failure_message";
    +		public static final String RESTORE_TIMESTAMP = "restore_timestamp";
    +		
    +		public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
    +		public static final String ACK_TIMESTAMP = "ack_timestamp";
    +		public static final String LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
    +		
    +		public static final String NUM_SUBTASKS = "num_subtasks";
    +		public static final String NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
    +		public static final String INDEX = "index";
    +		public static final String INTERVAL = "interval";
    +		public static final String ENABLED = "enabled";
    +		public static final String TIMEOUT = "timeout";
    +		public static final String MIN_PAUSE = "min_pause";
    +		public static final String MAX_CONCURRENT = "max_concurrent";
    +
    +		private Keys() {
    +		}
    +	}
    +
    +	public static void writeJobDetailOverviewAsJson(AccessExecutionGraph graph, JsonGenerator gen) throws IOException {
    +		CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis());
    +	}
    +
    +	public static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
    +		gen.writeNumberField(Keys.MIN, minMaxAvg.getMinimum());
    +		gen.writeNumberField(Keys.MAX, minMaxAvg.getMaximum());
    +		gen.writeNumberField(Keys.AVG, minMaxAvg.getAverage());
    +	}
    +
    +	public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
    --- End diff --
    
    Move these to the `MutableIOMetrics ` class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103473295
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java ---
    @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
     		}
     	}
     
    +	public ArchivedExecutionConfig(
    +			String executionMode,
    +			String restartStrategyDescription,
    +			int parallelism,
    +			boolean objectReuseEnabled,
    +			Map<String, String> globalJobParameters) {
    +		this.executionMode = executionMode;
    --- End diff --
    
    But isn't the other constructor already exactly trying to ensure what you say "simply won't happen"?
    
    ```java
    if (ec.getGlobalJobParameters() != null
            && ec.getGlobalJobParameters().toMap() != null) {
        globalJobParameters = ec.getGlobalJobParameters().toMap();
    } else {
        globalJobParameters = Collections.emptyMap();
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103433744
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/BuilderUtils.java ---
    @@ -0,0 +1,436 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import org.apache.flink.api.common.ArchivedExecutionConfig;
    +import org.apache.flink.api.common.ExecutionMode;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.metrics.Counter;
    +import org.apache.flink.metrics.MeterView;
    +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecution;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    +import org.apache.flink.runtime.util.EvictingBoundedList;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.SerializedValue;
    +
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Random;
    +
    +public class BuilderUtils {
    --- End diff --
    
    Could you please move the builders out to top level classes and remove the utils class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103712305
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class MutableIOMetrics extends IOMetrics {
    --- End diff --
    
    ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3365: [FLINK-5852] Move handler JSON generation code into stati...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3365
  
    @uce I will address your final comments while merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103430761
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/JsonUtils.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.annotation.Public;
    +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class JsonUtils {
    +	public static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
    +
    +	@Public
    +	public static final class Keys {
    --- End diff --
    
    I'm skeptical about these, because they blow up this class again. I like that that we "guard" the name here by marking them as `@Public`, but I think I would revert this again. Some keys are very common and I don't see the benefit in accessing them here. Other are very rare and only used in one handler anyways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103473952
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java ---
    @@ -41,19 +43,23 @@ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
     
     	@Override
     	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
    +		return createJobExceptionsJson(graph);
    +	}
    +
    +	public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
     		StringWriter writer = new StringWriter();
     		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
     
     		gen.writeStartObject();
     		
     		// most important is the root failure cause
     		String rootException = graph.getFailureCauseAsString();
    -		if (rootException != null) {
    -			gen.writeStringField("root-exception", rootException);
    +		if (!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
    --- End diff --
    
    Then maybe add a nonNull check to guard against unintended future changes of behaviour in ExecutionGraph?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103477833
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java ---
    @@ -54,6 +54,19 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
     		}
     	}
     
    +	public ArchivedExecutionConfig(
    +			String executionMode,
    +			String restartStrategyDescription,
    +			int parallelism,
    +			boolean objectReuseEnabled,
    +			Map<String, String> globalJobParameters) {
    +		this.executionMode = executionMode;
    --- End diff --
    
    Yes but they don't throw exceptions do they?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3365: [FLINK-5852] Move handler JSON generation code int...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3365#discussion_r103710173
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.utils;
    +
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import org.apache.flink.runtime.executiongraph.IOMetrics;
    +import org.apache.flink.runtime.metrics.MetricNames;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
    +import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
    +
    +import javax.annotation.Nullable;
    +import java.io.IOException;
    +
    +public class MutableIOMetrics extends IOMetrics {
    +
    +	private static final long serialVersionUID = -5460777634971381737L;
    +
    +	public MutableIOMetrics() {
    +		super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
    +	}
    +
    +	public void addNumBytesInLocal(long toAdd) {
    +		this.numBytesInLocal += toAdd;
    +	}
    +
    +	public void addNumBytesInRemote(long toAdd) {
    +		this.numBytesInRemote += toAdd;
    +	}
    +
    +	public void addNumBytesOut(long toAdd) {
    +		this.numBytesOut += toAdd;
    +	}
    +
    +	public void addNumRecordsIn(long toAdd) {
    +		this.numRecordsIn += toAdd;
    +	}
    +
    +	public void addNumRecordsOut(long toAdd) {
    +		this.numRecordsOut += toAdd;
    +	}
    +
    +	public static void addIOMetrics(MutableIOMetrics summedMetrics, ExecutionState state, @Nullable IOMetrics ioMetrics, @Nullable MetricFetcher fetcher, String jobID, String taskID, int subtaskIndex) {
    --- End diff --
    
    Quick question: could we make this an instance method instead of static?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---