You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GJL <gi...@git.apache.org> on 2018/01/31 19:07:56 UTC

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP

GitHub user GJL opened a pull request:

    https://github.com/apache/flink/pull/5397

    [FLINK-7856][flip6] WIP

    WIP
    
    PR is based on #4893 
    
    @tillrohrmann 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/GJL/flink FLINK-7856-3

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5397.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5397
    
----
commit bea6bf16b2cac1a8da1f3d28432798965b64cea9
Author: zjureel <zj...@...>
Date:   2017-11-15T05:55:28Z

    [FLINK-7856][flip6] Port JobVertexBackPressureHandler to REST endpoint

commit 968f0dfe8a3c6832b0e6a83c5c61eaa1fd886c1b
Author: gyao <ga...@...>
Date:   2018-01-31T19:01:24Z

    [hotfix][tests] Add Javadocs to JobMasterTest
    
    Add Javadocs to JobMasterTest.
    Remove debug print.

commit 6140fa6f460491bbe0eaf19d15b8a2f5d81622a0
Author: gyao <ga...@...>
Date:   2018-01-31T19:02:59Z

    [FLINK-7856][flip6] Implement JobVertexBackPressureHandler

commit 4625a637fa0a17a0ea0a3a48952d562a29fa5c06
Author: gyao <ga...@...>
Date:   2018-01-31T19:06:09Z

    [hotfix] Log swallowed exception in JobMaster

----


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165835486
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -313,6 +325,13 @@ public JobMaster(
     			.orElse(FutureUtils.completedExceptionally(new JobMasterException("The JobMaster has not been started with a REST endpoint.")));
     
     		this.metricQueryServicePath = metricQueryServicePath;
    +		this.stackTraceSampleCoordinator = new StackTraceSampleCoordinator(rpcService.getExecutor(), rpcTimeout.toMilliseconds());
    +		this.backPressureStatsTracker = new BackPressureStatsTracker(
    --- End diff --
    
    done


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r166228844
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java ---
    @@ -123,4 +126,19 @@
     			@RpcTimeout Time timeout) {
     		throw new UnsupportedOperationException();
     	}
    +
    +	/**
    +	 * Requests the statistics on operator back pressure.
    +	 *
    +	 * @param jobId       Job for which the stats are requested.
    +	 * @param jobVertexId JobVertex for which the stats are requested.
    +	 * @return A Future to the {@link OperatorBackPressureStats} or {@code null} if the stats are
    --- End diff --
    
    nit: This should be "A future to the {@link OperatorBackPressureStatsResponse}"?


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r166229283
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/VertexBackPressureLevelTest.java ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests for {@link JobVertexBackPressureInfo.VertexBackPressureLevel}.
    + */
    +public class VertexBackPressureLevelTest {
    --- End diff --
    
    `extends TestLogger` is missing.


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165835482
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
    +
    +import java.util.List;
    +import java.util.Objects;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Response type of the {@link JobVertexBackPressureHandler}.
    + */
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class JobVertexBackPressureInfo implements ResponseBody {
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +	public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level";
    +	public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
    +	public static final String FIELD_NAME_SUBTASKS = "subtasks";
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final VertexBackPressureStatus status;
    +
    +	@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
    +	private final VertexBackPressureLevel backpressureLevel;
    +
    +	@JsonProperty(FIELD_NAME_END_TIMESTAMP)
    +	private final Long endTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_SUBTASKS)
    +	protected final List<SubtaskBackPressureInfo> subtasks;
    +
    +	@JsonCreator
    +	public JobVertexBackPressureInfo(
    +		@JsonProperty(FIELD_NAME_STATUS) VertexBackPressureStatus status,
    +		@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) VertexBackPressureLevel backpressureLevel,
    +		@JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
    +		@JsonProperty(FIELD_NAME_SUBTASKS) List<SubtaskBackPressureInfo> subtasks) {
    +		this.status = status;
    +		this.backpressureLevel = backpressureLevel;
    +		this.endTimestamp = endTimestamp;
    +		this.subtasks = subtasks;
    +	}
    +
    +	public static JobVertexBackPressureInfo deprecated() {
    +		return new JobVertexBackPressureInfo(
    +			VertexBackPressureStatus.DEPRECATED,
    --- End diff --
    
    done


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165291604
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -313,6 +325,13 @@ public JobMaster(
     			.orElse(FutureUtils.completedExceptionally(new JobMasterException("The JobMaster has not been started with a REST endpoint.")));
     
     		this.metricQueryServicePath = metricQueryServicePath;
    +		this.stackTraceSampleCoordinator = new StackTraceSampleCoordinator(rpcService.getExecutor(), rpcTimeout.toMilliseconds());
    +		this.backPressureStatsTracker = new BackPressureStatsTracker(
    --- End diff --
    
    The `BackPressureStatsTracker` and the `StackTraceSampleCoordinator` could go into the `JobManagerServices`. That way, we would have a way to pass it into the `JobMaster`.


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165155639
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
    +
    +import java.util.List;
    +import java.util.Objects;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Response type of the {@link JobVertexBackPressureHandler}.
    + */
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class JobVertexBackPressureInfo implements ResponseBody {
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +	public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level";
    +	public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
    +	public static final String FIELD_NAME_SUBTASKS = "subtasks";
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final VertexBackPressureStatus status;
    +
    +	@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
    +	private final VertexBackPressureLevel backpressureLevel;
    +
    +	@JsonProperty(FIELD_NAME_END_TIMESTAMP)
    +	private final Long endTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_SUBTASKS)
    +	protected final List<SubtaskBackPressureInfo> subtasks;
    +
    +	@JsonCreator
    +	public JobVertexBackPressureInfo(
    +		@JsonProperty(FIELD_NAME_STATUS) VertexBackPressureStatus status,
    +		@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) VertexBackPressureLevel backpressureLevel,
    +		@JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
    +		@JsonProperty(FIELD_NAME_SUBTASKS) List<SubtaskBackPressureInfo> subtasks) {
    +		this.status = status;
    +		this.backpressureLevel = backpressureLevel;
    +		this.endTimestamp = endTimestamp;
    +		this.subtasks = subtasks;
    +	}
    +
    +	public static JobVertexBackPressureInfo deprecated() {
    +		return new JobVertexBackPressureInfo(
    +			VertexBackPressureStatus.DEPRECATED,
    +			null,
    +			null,
    +			null);
    +	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || getClass() != o.getClass()) {
    +			return false;
    +		}
    +		JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
    +		return Objects.equals(status, that.status) &&
    +			Objects.equals(backpressureLevel, that.backpressureLevel) &&
    +			Objects.equals(endTimestamp, that.endTimestamp) &&
    +			Objects.equals(subtasks, that.subtasks);
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(status, backpressureLevel, endTimestamp, subtasks);
    +	}
    +
    +	//---------------------------------------------------------------------------------
    +	// Static helper classes
    +	//---------------------------------------------------------------------------------
    +
    +	/**
    +	 * Nested class to encapsulate the sub tasks back pressure.
    +	 */
    +	public static final class SubtaskBackPressureInfo {
    +
    +		public static final String FIELD_NAME_SUBTASK = "subtask";
    +		public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level";
    +		public static final String FIELD_NAME_RATIO = "ratio";
    +
    +		@JsonProperty(FIELD_NAME_SUBTASK)
    +		private final int subtask;
    +
    +		@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
    +		private final VertexBackPressureLevel backpressureLevel;
    +
    +		@JsonProperty(FIELD_NAME_RATIO)
    +		private final double ratio;
    +
    +		public SubtaskBackPressureInfo(
    +			@JsonProperty(FIELD_NAME_SUBTASK) int subtask,
    +			@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) VertexBackPressureLevel backpressureLevel,
    +			@JsonProperty(FIELD_NAME_RATIO) double ratio) {
    +			this.subtask = subtask;
    +			this.backpressureLevel = checkNotNull(backpressureLevel);
    +			this.ratio = ratio;
    +		}
    +
    +		@Override
    +		public boolean equals(Object o) {
    +			if (this == o) {
    +				return true;
    +			}
    +			if (o == null || getClass() != o.getClass()) {
    +				return false;
    +			}
    +			SubtaskBackPressureInfo that = (SubtaskBackPressureInfo) o;
    +			return subtask == that.subtask &&
    +				ratio == that.ratio &&
    +				Objects.equals(backpressureLevel, that.backpressureLevel);
    +		}
    +
    +		@Override
    +		public int hashCode() {
    +			return Objects.hash(subtask, backpressureLevel, ratio);
    +		}
    +	}
    +
    +	/**
    +	 * Status of vertex back-pressure.
    +	 */
    +	public enum VertexBackPressureStatus {
    +		DEPRECATED("deprecated"), OK("ok");
    +
    +		private String status;
    +
    +		VertexBackPressureStatus(String status) {
    +			this.status = status;
    +		}
    +
    +		@JsonValue
    --- End diff --
    
    Needs to be tested.


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165835488
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -358,6 +361,18 @@ public void start() throws Exception {
     		}
     	}
     
    +	@Override
    +	public CompletableFuture<Optional<OperatorBackPressureStats>> getOperatorBackPressureStats(
    --- End diff --
    
    done


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165835514
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -305,6 +311,84 @@ public void postStop() throws Exception {
     	//  RPC methods
     	// ======================================================================
     
    +	@Override
    +	public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    +			final ExecutionAttemptID executionAttemptId,
    +			final int sampleId,
    +			final int numSamples,
    +			final Time delayBetweenSamples,
    +			final int maxStackTraceDepth,
    +			final Time timeout) {
    +		return requestStackTraceSample(
    +			executionAttemptId,
    +			sampleId,
    +			numSamples,
    +			delayBetweenSamples,
    +			maxStackTraceDepth,
    +			new ArrayList<>(numSamples),
    +			new CompletableFuture<>());
    +	}
    +
    +	private CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    +			final ExecutionAttemptID executionAttemptId,
    +			final int sampleId,
    +			final int numSamples,
    +			final Time delayBetweenSamples,
    +			final int maxStackTraceDepth,
    +			final List<StackTraceElement[]> currentTraces,
    +			final CompletableFuture<StackTraceSampleResponse> resultFuture) {
    +
    +		if (numSamples > 0) {
    +			getRpcService().getScheduledExecutor().schedule(() -> runAsync(() -> {
    +				final Optional<StackTraceElement[]> stackTrace = getStackTrace(executionAttemptId, maxStackTraceDepth);
    --- End diff --
    
    done


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165290983
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -358,6 +361,18 @@ public void start() throws Exception {
     		}
     	}
     
    +	@Override
    +	public CompletableFuture<Optional<OperatorBackPressureStats>> getOperatorBackPressureStats(
    --- End diff --
    
    I think `Optional` is not serializable. We should always return serializable objects as the result of a RPC.


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165291802
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -867,6 +889,25 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) {
     		}
     	}
     
    +	@Override
    +	public CompletableFuture<Optional<OperatorBackPressureStats>> getOperatorBackPressureStats(
    +			final JobID jobId, final JobVertexID jobVertexId) {
    +		final ExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexId);
    +		if (jobVertex == null) {
    +			return FutureUtils.completedExceptionally(new FlinkException("JobVertexID not found " +
    +				jobVertexId));
    +		}
    +
    +		final Optional<OperatorBackPressureStats> operatorBackPressureStats =
    +			backPressureStatsTracker.getOperatorBackPressureStats(jobVertex);
    +		if (!operatorBackPressureStats.isPresent() ||
    +			backPressureStatsRefreshInterval <= System.currentTimeMillis() - operatorBackPressureStats.get().getEndTimestamp()) {
    +			backPressureStatsTracker.triggerStackTraceSample(jobVertex);
    +			return CompletableFuture.completedFuture(Optional.empty());
    --- End diff --
    
    Why not returning the last back-pressure result if there is an old one?


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165666773
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java ---
    @@ -75,6 +76,17 @@ public void setDisconnectJobManagerConsumer(Consumer<Tuple2<JobID, Throwable>> d
     		return CompletableFuture.completedFuture(Acknowledge.get());
     	}
     
    +	@Override
    +	public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    --- End diff --
    
    or maybe not


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r166228267
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -305,6 +311,84 @@ public void postStop() throws Exception {
     	//  RPC methods
     	// ======================================================================
     
    +	@Override
    +	public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    +			final ExecutionAttemptID executionAttemptId,
    +			final int sampleId,
    +			final int numSamples,
    +			final Time delayBetweenSamples,
    +			final int maxStackTraceDepth,
    +			final Time timeout) {
    +		return requestStackTraceSample(
    +			executionAttemptId,
    +			sampleId,
    +			numSamples,
    +			delayBetweenSamples,
    +			maxStackTraceDepth,
    +			new ArrayList<>(numSamples),
    +			new CompletableFuture<>());
    +	}
    +
    +	private CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    +			final ExecutionAttemptID executionAttemptId,
    +			final int sampleId,
    +			final int numSamples,
    +			final Time delayBetweenSamples,
    +			final int maxStackTraceDepth,
    +			final List<StackTraceElement[]> currentTraces,
    +			final CompletableFuture<StackTraceSampleResponse> resultFuture) {
    +
    +		if (numSamples > 0) {
    +			scheduleRunAsync(() -> runAsync(() -> {
    --- End diff --
    
    I think, we don't need `() -> runAsync`.


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165298568
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -305,6 +311,84 @@ public void postStop() throws Exception {
     	//  RPC methods
     	// ======================================================================
     
    +	@Override
    +	public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    +			final ExecutionAttemptID executionAttemptId,
    +			final int sampleId,
    +			final int numSamples,
    +			final Time delayBetweenSamples,
    +			final int maxStackTraceDepth,
    +			final Time timeout) {
    +		return requestStackTraceSample(
    +			executionAttemptId,
    +			sampleId,
    +			numSamples,
    +			delayBetweenSamples,
    +			maxStackTraceDepth,
    +			new ArrayList<>(numSamples),
    +			new CompletableFuture<>());
    +	}
    +
    +	private CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    +			final ExecutionAttemptID executionAttemptId,
    +			final int sampleId,
    +			final int numSamples,
    +			final Time delayBetweenSamples,
    +			final int maxStackTraceDepth,
    +			final List<StackTraceElement[]> currentTraces,
    +			final CompletableFuture<StackTraceSampleResponse> resultFuture) {
    +
    +		if (numSamples > 0) {
    +			getRpcService().getScheduledExecutor().schedule(() -> runAsync(() -> {
    +				final Optional<StackTraceElement[]> stackTrace = getStackTrace(executionAttemptId, maxStackTraceDepth);
    --- End diff --
    
    First call to `getStackTrace` should not be delayed. I think it is enough to move this line out of the `schedule` call..


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165155912
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java ---
    @@ -74,7 +74,7 @@
     	static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
     
     	/** Expected method name for back pressure indicating stack trace element. */
    -	static final String EXPECTED_METHOD_NAME = "requestBufferBlocking";
    +	static final String EXPECTED_METHOD_NAME = "requestBufferBuilderBlocking";
    --- End diff --
    
    Requires a follow up ticket to make it less fragile.


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r166229314
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/VertexBackPressureStatusTest.java ---
    @@ -0,0 +1,44 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests for {@link JobVertexBackPressureInfo.VertexBackPressureStatus}.
    + */
    +public class VertexBackPressureStatusTest {
    --- End diff --
    
    Same here.


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r166228917
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
    +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
    +import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
    +import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
    +import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureStatus;
    +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
    +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.stream.Collectors;
    +
    +import static org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH;
    +import static org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureLevel.LOW;
    +import static org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureLevel.OK;
    +import static org.hamcrest.Matchers.contains;
    +import static org.hamcrest.Matchers.equalTo;
    +import static org.junit.Assert.assertThat;
    +
    +/**
    + * Tests for {@link JobVertexBackPressureHandler}.
    + */
    +public class JobVertexBackPressureHandlerTest {
    --- End diff --
    
    `extends TestLogger` is missing.


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165292444
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
    +
    +import java.util.List;
    +import java.util.Objects;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Response type of the {@link JobVertexBackPressureHandler}.
    + */
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class JobVertexBackPressureInfo implements ResponseBody {
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +	public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level";
    +	public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
    +	public static final String FIELD_NAME_SUBTASKS = "subtasks";
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final VertexBackPressureStatus status;
    +
    +	@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
    +	private final VertexBackPressureLevel backpressureLevel;
    +
    +	@JsonProperty(FIELD_NAME_END_TIMESTAMP)
    +	private final Long endTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_SUBTASKS)
    +	protected final List<SubtaskBackPressureInfo> subtasks;
    +
    +	@JsonCreator
    +	public JobVertexBackPressureInfo(
    +		@JsonProperty(FIELD_NAME_STATUS) VertexBackPressureStatus status,
    +		@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) VertexBackPressureLevel backpressureLevel,
    +		@JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
    +		@JsonProperty(FIELD_NAME_SUBTASKS) List<SubtaskBackPressureInfo> subtasks) {
    +		this.status = status;
    +		this.backpressureLevel = backpressureLevel;
    +		this.endTimestamp = endTimestamp;
    +		this.subtasks = subtasks;
    +	}
    +
    +	public static JobVertexBackPressureInfo deprecated() {
    +		return new JobVertexBackPressureInfo(
    +			VertexBackPressureStatus.DEPRECATED,
    --- End diff --
    
    Why do we explicitly say that the back pressure information is deprecated. Serving some old information is better than nothing if we still have some older back pressure information, right? The caller could still throw the information away based on `endTimestamp` if it is too old.


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165155368
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java ---
    @@ -75,6 +76,17 @@ public void setDisconnectJobManagerConsumer(Consumer<Tuple2<JobID, Throwable>> d
     		return CompletableFuture.completedFuture(Acknowledge.get());
     	}
     
    +	@Override
    +	public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    --- End diff --
    
    Needs to be implemented


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5397


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165296332
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -358,6 +361,18 @@ public void start() throws Exception {
     		}
     	}
     
    +	@Override
    +	public CompletableFuture<Optional<OperatorBackPressureStats>> getOperatorBackPressureStats(
    --- End diff --
    
    true, forgot about it.


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165835481
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -305,6 +311,84 @@ public void postStop() throws Exception {
     	//  RPC methods
     	// ======================================================================
     
    +	@Override
    +	public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    +			final ExecutionAttemptID executionAttemptId,
    +			final int sampleId,
    +			final int numSamples,
    +			final Time delayBetweenSamples,
    +			final int maxStackTraceDepth,
    +			final Time timeout) {
    +		return requestStackTraceSample(
    +			executionAttemptId,
    +			sampleId,
    +			numSamples,
    +			delayBetweenSamples,
    +			maxStackTraceDepth,
    +			new ArrayList<>(numSamples),
    +			new CompletableFuture<>());
    +	}
    +
    +	private CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    +			final ExecutionAttemptID executionAttemptId,
    +			final int sampleId,
    +			final int numSamples,
    +			final Time delayBetweenSamples,
    +			final int maxStackTraceDepth,
    +			final List<StackTraceElement[]> currentTraces,
    +			final CompletableFuture<StackTraceSampleResponse> resultFuture) {
    +
    +		if (numSamples > 0) {
    +			getRpcService().getScheduledExecutor().schedule(() -> runAsync(() -> {
    --- End diff --
    
    done


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165296204
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
    +
    +import java.util.List;
    +import java.util.Objects;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Response type of the {@link JobVertexBackPressureHandler}.
    + */
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class JobVertexBackPressureInfo implements ResponseBody {
    +
    +	public static final String FIELD_NAME_STATUS = "status";
    +	public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level";
    +	public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
    +	public static final String FIELD_NAME_SUBTASKS = "subtasks";
    +
    +	@JsonProperty(FIELD_NAME_STATUS)
    +	private final VertexBackPressureStatus status;
    +
    +	@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
    +	private final VertexBackPressureLevel backpressureLevel;
    +
    +	@JsonProperty(FIELD_NAME_END_TIMESTAMP)
    +	private final Long endTimestamp;
    +
    +	@JsonProperty(FIELD_NAME_SUBTASKS)
    +	protected final List<SubtaskBackPressureInfo> subtasks;
    +
    +	@JsonCreator
    +	public JobVertexBackPressureInfo(
    +		@JsonProperty(FIELD_NAME_STATUS) VertexBackPressureStatus status,
    +		@JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) VertexBackPressureLevel backpressureLevel,
    +		@JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
    +		@JsonProperty(FIELD_NAME_SUBTASKS) List<SubtaskBackPressureInfo> subtasks) {
    +		this.status = status;
    +		this.backpressureLevel = backpressureLevel;
    +		this.endTimestamp = endTimestamp;
    +		this.subtasks = subtasks;
    +	}
    +
    +	public static JobVertexBackPressureInfo deprecated() {
    +		return new JobVertexBackPressureInfo(
    +			VertexBackPressureStatus.DEPRECATED,
    --- End diff --
    
    Can do and I thought about it but that's how it is done currently. If status is `deprecated`, the ui will render *Sampling in progress*. 


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165835483
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -867,6 +889,25 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) {
     		}
     	}
     
    +	@Override
    +	public CompletableFuture<Optional<OperatorBackPressureStats>> getOperatorBackPressureStats(
    +			final JobID jobId, final JobVertexID jobVertexId) {
    +		final ExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexId);
    +		if (jobVertex == null) {
    +			return FutureUtils.completedExceptionally(new FlinkException("JobVertexID not found " +
    +				jobVertexId));
    +		}
    +
    +		final Optional<OperatorBackPressureStats> operatorBackPressureStats =
    +			backPressureStatsTracker.getOperatorBackPressureStats(jobVertex);
    +		if (!operatorBackPressureStats.isPresent() ||
    +			backPressureStatsRefreshInterval <= System.currentTimeMillis() - operatorBackPressureStats.get().getEndTimestamp()) {
    +			backPressureStatsTracker.triggerStackTraceSample(jobVertex);
    +			return CompletableFuture.completedFuture(Optional.empty());
    --- End diff --
    
    done


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165835719
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -305,6 +311,84 @@ public void postStop() throws Exception {
     	//  RPC methods
     	// ======================================================================
     
    +	@Override
    +	public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    +			final ExecutionAttemptID executionAttemptId,
    +			final int sampleId,
    +			final int numSamples,
    +			final Time delayBetweenSamples,
    +			final int maxStackTraceDepth,
    +			final Time timeout) {
    +		return requestStackTraceSample(
    +			executionAttemptId,
    +			sampleId,
    +			numSamples,
    +			delayBetweenSamples,
    +			maxStackTraceDepth,
    +			new ArrayList<>(numSamples),
    +			new CompletableFuture<>());
    +	}
    +
    +	private CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    +			final ExecutionAttemptID executionAttemptId,
    +			final int sampleId,
    +			final int numSamples,
    +			final Time delayBetweenSamples,
    +			final int maxStackTraceDepth,
    +			final List<StackTraceElement[]> currentTraces,
    +			final CompletableFuture<StackTraceSampleResponse> resultFuture) {
    +
    +		if (numSamples > 0) {
    +			getRpcService().getScheduledExecutor().schedule(() -> runAsync(() -> {
    +				final Optional<StackTraceElement[]> stackTrace = getStackTrace(executionAttemptId, maxStackTraceDepth);
    --- End diff --
    
    Fixed.


---

[GitHub] flink issue #5397: [FLINK-7856][flip6] Port JobVertexBackPressureHandler to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5397
  
    Changes look good to me. Thanks for your contribution @GJL. Merging this PR.


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r166226318
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java ---
    @@ -137,11 +151,28 @@ public static JobManagerServices fromConfiguration(
     				Hardware.getNumberCPUCores(),
     				new ExecutorThreadFactory("jobmanager-future"));
     
    +		final StackTraceSampleCoordinator stackTraceSampleCoordinator =
    +			new StackTraceSampleCoordinator(futureExecutor, timeout.toMillis());
    +		final BackPressureStatsTracker backPressureStatsTracker = new BackPressureStatsTracker(
    +			stackTraceSampleCoordinator,
    +			config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL),
    +			config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
    +			config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL),
    +			Time.milliseconds(config.getInteger(WebOptions.BACKPRESSURE_DELAY)));
    +
    +		futureExecutor.scheduleWithFixedDelay(
    --- End diff --
    
    Not sure whether we should schedule the cleanup here because that way we don't have access to the `ScheduledFuture` which we should cancel once we shut the `BackPressureStatsTracker` down. I think this should happen in the `JobMaster`.


---

[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5397#discussion_r165294395
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -305,6 +311,84 @@ public void postStop() throws Exception {
     	//  RPC methods
     	// ======================================================================
     
    +	@Override
    +	public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    +			final ExecutionAttemptID executionAttemptId,
    +			final int sampleId,
    +			final int numSamples,
    +			final Time delayBetweenSamples,
    +			final int maxStackTraceDepth,
    +			final Time timeout) {
    +		return requestStackTraceSample(
    +			executionAttemptId,
    +			sampleId,
    +			numSamples,
    +			delayBetweenSamples,
    +			maxStackTraceDepth,
    +			new ArrayList<>(numSamples),
    +			new CompletableFuture<>());
    +	}
    +
    +	private CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
    +			final ExecutionAttemptID executionAttemptId,
    +			final int sampleId,
    +			final int numSamples,
    +			final Time delayBetweenSamples,
    +			final int maxStackTraceDepth,
    +			final List<StackTraceElement[]> currentTraces,
    +			final CompletableFuture<StackTraceSampleResponse> resultFuture) {
    +
    +		if (numSamples > 0) {
    +			getRpcService().getScheduledExecutor().schedule(() -> runAsync(() -> {
    --- End diff --
    
    This should be replaceable with `this.scheduleRunAsync(() -> ...)`


---