You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/23 09:56:26 UTC

[GitHub] [flink] zhuzhurk opened a new pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

zhuzhurk opened a new pull request #13766:
URL: https://github.com/apache/flink/pull/13766


   ## What is the purpose of the change
   
   Execution#maybeReleasePartitionsAndSendCancelRpcCall(...) will be not invoked when a task is reported to be failed in TaskManager, which results in its partitions to still be tacked by the job manager partition tracker.
   
   ## Verifying this change
   
   This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no**)**
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13766:
URL: https://github.com/apache/flink/pull/13766#issuecomment-715255564


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8175",
       "triggerID" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b46f97dd6b1fa67b4dd169867c1967877ac8534c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b46f97dd6b1fa67b4dd169867c1967877ac8534c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 42f066384d04bcfe67ccbb5766f09ab5dde9e19c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8175) 
   * b46f97dd6b1fa67b4dd169867c1967877ac8534c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #13766:
URL: https://github.com/apache/flink/pull/13766#discussion_r514906398



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateWrapper.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wraps {@link TaskExecutionState}, along with actions to take
+ * if it is FAILED state.
+ */
+public class TaskExecutionStateWrapper {

Review comment:
       I did not have a good idea to name it and `TaskExecutionStateTransition` sounds good to me. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #13766:
URL: https://github.com/apache/flink/pull/13766#discussion_r514957052



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
##########
@@ -84,7 +85,11 @@
 
 	void handleGlobalFailure(Throwable cause);
 
-	boolean updateTaskExecutionState(TaskExecutionState taskExecutionState);
+	default boolean updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+		return updateTaskExecutionState(TaskExecutionStateWrapper.createFrom(taskExecutionState));

Review comment:
       You are right. However, this method will be removed soon when removing the legacy scheduling in ExecutionGraph (FLINK-15626). And I hope to not refactor its usages right now to avoid massive conflicts with the ongoing change of FLINK-17760.
   I will mark it as deprecated for now to prevent it from being further used?
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #13766:
URL: https://github.com/apache/flink/pull/13766#discussion_r514906398



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateWrapper.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wraps {@link TaskExecutionState}, along with actions to take
+ * if it is FAILED state.
+ */
+public class TaskExecutionStateWrapper {

Review comment:
       I did not have a good name for it and `TaskExecutionStateTransition` sounds good to me. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13766:
URL: https://github.com/apache/flink/pull/13766#issuecomment-715238294


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 42f066384d04bcfe67ccbb5766f09ab5dde9e19c (Fri Oct 23 09:58:56 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #13766:
URL: https://github.com/apache/flink/pull/13766#discussion_r514953464



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateWrapper.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wraps {@link TaskExecutionState}, along with actions to take
+ * if it is FAILED state.
+ */
+public class TaskExecutionStateWrapper {
+
+	private final TaskExecutionState taskExecutionState;
+
+	/**
+	 * Indicating whether to send a PRC call to remove task from TaskManager.
+	 * True if the failure is fired by JobManager and the execution is already
+	 * deployed. Otherwise it should be false.
+	 */
+	private final boolean cancelTask;
+
+	private final boolean releasePartitions;
+
+	private TaskExecutionStateWrapper(
+			final TaskExecutionState taskExecutionState,
+			final boolean cancelTask,
+			final boolean releasePartitions) {
+
+		this.taskExecutionState = checkNotNull(taskExecutionState);
+		this.cancelTask = cancelTask;
+		this.releasePartitions = releasePartitions;
+	}
+
+	public Throwable getError(ClassLoader userCodeClassloader) {
+		return taskExecutionState.getError(userCodeClassloader);
+	}
+
+	public ExecutionAttemptID getID() {
+		return taskExecutionState.getID();
+	}
+
+	public ExecutionState getExecutionState() {
+		return taskExecutionState.getExecutionState();
+	}
+
+	public JobID getJobID() {
+		return taskExecutionState.getJobID();
+	}
+
+	public AccumulatorSnapshot getAccumulators() {
+		return taskExecutionState.getAccumulators();
+	}
+
+	public IOMetrics getIOMetrics() {
+		return taskExecutionState.getIOMetrics();
+	}
+
+	public boolean getCancelTask() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);
+		return cancelTask;
+	}
+
+	public boolean getReleasePartitions() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);
+		return releasePartitions;
+	}
+
+	public static TaskExecutionStateWrapper createFrom(TaskExecutionState taskExecutionState) {
+		return new TaskExecutionStateWrapper(taskExecutionState, false, false);
+	}
+
+	public static TaskExecutionStateWrapper createFromFailedState(
+			final TaskExecutionState taskExecutionState,
+			final boolean cancelTask,
+			final boolean releasePartitions) {
+
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);

Review comment:
       I will remove the check and the factory methods. See comment https://github.com/apache/flink/pull/13766#discussion_r514952083.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13766:
URL: https://github.com/apache/flink/pull/13766#issuecomment-715255564


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8175",
       "triggerID" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b46f97dd6b1fa67b4dd169867c1967877ac8534c",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8263",
       "triggerID" : "b46f97dd6b1fa67b4dd169867c1967877ac8534c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8fe7e75c390442fa9aab16ab05c3cc5e0fc6d553",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8669",
       "triggerID" : "8fe7e75c390442fa9aab16ab05c3cc5e0fc6d553",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b46f97dd6b1fa67b4dd169867c1967877ac8534c Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8263) 
   * 8fe7e75c390442fa9aab16ab05c3cc5e0fc6d553 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8669) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #13766:
URL: https://github.com/apache/flink/pull/13766#discussion_r514952083



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateWrapper.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wraps {@link TaskExecutionState}, along with actions to take
+ * if it is FAILED state.
+ */
+public class TaskExecutionStateWrapper {
+
+	private final TaskExecutionState taskExecutionState;
+
+	/**
+	 * Indicating whether to send a PRC call to remove task from TaskManager.
+	 * True if the failure is fired by JobManager and the execution is already
+	 * deployed. Otherwise it should be false.
+	 */
+	private final boolean cancelTask;
+
+	private final boolean releasePartitions;
+
+	private TaskExecutionStateWrapper(
+			final TaskExecutionState taskExecutionState,
+			final boolean cancelTask,
+			final boolean releasePartitions) {
+
+		this.taskExecutionState = checkNotNull(taskExecutionState);
+		this.cancelTask = cancelTask;
+		this.releasePartitions = releasePartitions;
+	}
+
+	public Throwable getError(ClassLoader userCodeClassloader) {
+		return taskExecutionState.getError(userCodeClassloader);
+	}
+
+	public ExecutionAttemptID getID() {
+		return taskExecutionState.getID();
+	}
+
+	public ExecutionState getExecutionState() {
+		return taskExecutionState.getExecutionState();
+	}
+
+	public JobID getJobID() {
+		return taskExecutionState.getJobID();
+	}
+
+	public AccumulatorSnapshot getAccumulators() {
+		return taskExecutionState.getAccumulators();
+	}
+
+	public IOMetrics getIOMetrics() {
+		return taskExecutionState.getIOMetrics();
+	}
+
+	public boolean getCancelTask() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);

Review comment:
       Ok. I will remove the check. Also regarding your other comments, I think we do not need factory methods to differentiate whether it is created for FAILED state. I will add 2 constructors instead. One needs to specify the `cancelTask/releasePartitions` flags if they should be `true`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13766:
URL: https://github.com/apache/flink/pull/13766#issuecomment-715255564


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8175",
       "triggerID" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b46f97dd6b1fa67b4dd169867c1967877ac8534c",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8263",
       "triggerID" : "b46f97dd6b1fa67b4dd169867c1967877ac8534c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8fe7e75c390442fa9aab16ab05c3cc5e0fc6d553",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8fe7e75c390442fa9aab16ab05c3cc5e0fc6d553",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b46f97dd6b1fa67b4dd169867c1967877ac8534c Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8263) 
   * 8fe7e75c390442fa9aab16ab05c3cc5e0fc6d553 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13766:
URL: https://github.com/apache/flink/pull/13766#issuecomment-715255564


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8175",
       "triggerID" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b46f97dd6b1fa67b4dd169867c1967877ac8534c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8263",
       "triggerID" : "b46f97dd6b1fa67b4dd169867c1967877ac8534c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 42f066384d04bcfe67ccbb5766f09ab5dde9e19c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8175) 
   * b46f97dd6b1fa67b4dd169867c1967877ac8534c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8263) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk merged pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
zhuzhurk merged pull request #13766:
URL: https://github.com/apache/flink/pull/13766


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #13766:
URL: https://github.com/apache/flink/pull/13766#discussion_r514952083



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateWrapper.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wraps {@link TaskExecutionState}, along with actions to take
+ * if it is FAILED state.
+ */
+public class TaskExecutionStateWrapper {
+
+	private final TaskExecutionState taskExecutionState;
+
+	/**
+	 * Indicating whether to send a PRC call to remove task from TaskManager.
+	 * True if the failure is fired by JobManager and the execution is already
+	 * deployed. Otherwise it should be false.
+	 */
+	private final boolean cancelTask;
+
+	private final boolean releasePartitions;
+
+	private TaskExecutionStateWrapper(
+			final TaskExecutionState taskExecutionState,
+			final boolean cancelTask,
+			final boolean releasePartitions) {
+
+		this.taskExecutionState = checkNotNull(taskExecutionState);
+		this.cancelTask = cancelTask;
+		this.releasePartitions = releasePartitions;
+	}
+
+	public Throwable getError(ClassLoader userCodeClassloader) {
+		return taskExecutionState.getError(userCodeClassloader);
+	}
+
+	public ExecutionAttemptID getID() {
+		return taskExecutionState.getID();
+	}
+
+	public ExecutionState getExecutionState() {
+		return taskExecutionState.getExecutionState();
+	}
+
+	public JobID getJobID() {
+		return taskExecutionState.getJobID();
+	}
+
+	public AccumulatorSnapshot getAccumulators() {
+		return taskExecutionState.getAccumulators();
+	}
+
+	public IOMetrics getIOMetrics() {
+		return taskExecutionState.getIOMetrics();
+	}
+
+	public boolean getCancelTask() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);

Review comment:
       Ok. I will remove the check. Also regarding your other comments, I think we do not need factory methods to differentiate whether it is created for FAILED state. I will add 2 constructors instead. One needs to specify the `cancelTask/releasePartitions` flags if the they need to be `true`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #13766:
URL: https://github.com/apache/flink/pull/13766#discussion_r514959822



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateWrapper.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wraps {@link TaskExecutionState}, along with actions to take
+ * if it is FAILED state.
+ */
+public class TaskExecutionStateWrapper {
+
+	private final TaskExecutionState taskExecutionState;
+
+	/**
+	 * Indicating whether to send a PRC call to remove task from TaskManager.
+	 * True if the failure is fired by JobManager and the execution is already
+	 * deployed. Otherwise it should be false.
+	 */
+	private final boolean cancelTask;
+
+	private final boolean releasePartitions;
+
+	private TaskExecutionStateWrapper(
+			final TaskExecutionState taskExecutionState,
+			final boolean cancelTask,
+			final boolean releasePartitions) {
+
+		this.taskExecutionState = checkNotNull(taskExecutionState);
+		this.cancelTask = cancelTask;
+		this.releasePartitions = releasePartitions;
+	}
+
+	public Throwable getError(ClassLoader userCodeClassloader) {
+		return taskExecutionState.getError(userCodeClassloader);
+	}
+
+	public ExecutionAttemptID getID() {
+		return taskExecutionState.getID();
+	}
+
+	public ExecutionState getExecutionState() {
+		return taskExecutionState.getExecutionState();
+	}
+
+	public JobID getJobID() {
+		return taskExecutionState.getJobID();
+	}
+
+	public AccumulatorSnapshot getAccumulators() {
+		return taskExecutionState.getAccumulators();
+	}
+
+	public IOMetrics getIOMetrics() {
+		return taskExecutionState.getIOMetrics();
+	}
+
+	public boolean getCancelTask() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);
+		return cancelTask;
+	}
+
+	public boolean getReleasePartitions() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);
+		return releasePartitions;
+	}
+
+	public static TaskExecutionStateWrapper createFrom(TaskExecutionState taskExecutionState) {
+		return new TaskExecutionStateWrapper(taskExecutionState, false, false);
+	}
+
+	public static TaskExecutionStateWrapper createFromFailedState(
+			final TaskExecutionState taskExecutionState,
+			final boolean cancelTask,
+			final boolean releasePartitions) {
+
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);

Review comment:
       Just a note that `createFrom()` will also be used for FAILED state which is reported from TaskManager because the flags will be false. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13766:
URL: https://github.com/apache/flink/pull/13766#issuecomment-715255564


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8175",
       "triggerID" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 42f066384d04bcfe67ccbb5766f09ab5dde9e19c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8175) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #13766:
URL: https://github.com/apache/flink/pull/13766#discussion_r514957052



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
##########
@@ -84,7 +85,11 @@
 
 	void handleGlobalFailure(Throwable cause);
 
-	boolean updateTaskExecutionState(TaskExecutionState taskExecutionState);
+	default boolean updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+		return updateTaskExecutionState(TaskExecutionStateWrapper.createFrom(taskExecutionState));

Review comment:
       You are right. However, this method will be removed soon when removing the legacy scheduling in ExecutionGraph (FLINK-15626). And I hope to not refactor its usages right now to avoid massive conflicts with the ongoing change of FLINK-17760 which will replace the usage with `SchedulerNG#updateTaskExecutionState()`.
   I will mark it as deprecated for now to prevent it from being further used?
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #13766:
URL: https://github.com/apache/flink/pull/13766#discussion_r514571998



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateWrapper.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wraps {@link TaskExecutionState}, along with actions to take
+ * if it is FAILED state.
+ */
+public class TaskExecutionStateWrapper {
+
+	private final TaskExecutionState taskExecutionState;
+
+	/**
+	 * Indicating whether to send a PRC call to remove task from TaskManager.

Review comment:
       PRC  -> RPC

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateWrapper.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wraps {@link TaskExecutionState}, along with actions to take
+ * if it is FAILED state.
+ */
+public class TaskExecutionStateWrapper {
+
+	private final TaskExecutionState taskExecutionState;
+
+	/**
+	 * Indicating whether to send a PRC call to remove task from TaskManager.
+	 * True if the failure is fired by JobManager and the execution is already
+	 * deployed. Otherwise it should be false.
+	 */
+	private final boolean cancelTask;
+
+	private final boolean releasePartitions;
+
+	private TaskExecutionStateWrapper(
+			final TaskExecutionState taskExecutionState,
+			final boolean cancelTask,
+			final boolean releasePartitions) {
+
+		this.taskExecutionState = checkNotNull(taskExecutionState);
+		this.cancelTask = cancelTask;
+		this.releasePartitions = releasePartitions;
+	}
+
+	public Throwable getError(ClassLoader userCodeClassloader) {
+		return taskExecutionState.getError(userCodeClassloader);
+	}
+
+	public ExecutionAttemptID getID() {
+		return taskExecutionState.getID();
+	}
+
+	public ExecutionState getExecutionState() {
+		return taskExecutionState.getExecutionState();
+	}
+
+	public JobID getJobID() {
+		return taskExecutionState.getJobID();
+	}
+
+	public AccumulatorSnapshot getAccumulators() {
+		return taskExecutionState.getAccumulators();
+	}
+
+	public IOMetrics getIOMetrics() {
+		return taskExecutionState.getIOMetrics();
+	}
+
+	public boolean getCancelTask() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);

Review comment:
       why should these only be queryable if the state is failed? I get the _idea_, but imo such a simple data-structure shouldn't enforce behaviors.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateWrapper.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wraps {@link TaskExecutionState}, along with actions to take
+ * if it is FAILED state.
+ */
+public class TaskExecutionStateWrapper {

Review comment:
       a more apt name might by "TaskExecutionStateTransition"

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateWrapper.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wraps {@link TaskExecutionState}, along with actions to take
+ * if it is FAILED state.
+ */
+public class TaskExecutionStateWrapper {
+
+	private final TaskExecutionState taskExecutionState;
+
+	/**
+	 * Indicating whether to send a PRC call to remove task from TaskManager.
+	 * True if the failure is fired by JobManager and the execution is already
+	 * deployed. Otherwise it should be false.
+	 */
+	private final boolean cancelTask;
+
+	private final boolean releasePartitions;
+
+	private TaskExecutionStateWrapper(
+			final TaskExecutionState taskExecutionState,
+			final boolean cancelTask,
+			final boolean releasePartitions) {
+
+		this.taskExecutionState = checkNotNull(taskExecutionState);
+		this.cancelTask = cancelTask;
+		this.releasePartitions = releasePartitions;
+	}
+
+	public Throwable getError(ClassLoader userCodeClassloader) {
+		return taskExecutionState.getError(userCodeClassloader);
+	}
+
+	public ExecutionAttemptID getID() {
+		return taskExecutionState.getID();
+	}
+
+	public ExecutionState getExecutionState() {
+		return taskExecutionState.getExecutionState();
+	}
+
+	public JobID getJobID() {
+		return taskExecutionState.getJobID();
+	}
+
+	public AccumulatorSnapshot getAccumulators() {
+		return taskExecutionState.getAccumulators();
+	}
+
+	public IOMetrics getIOMetrics() {
+		return taskExecutionState.getIOMetrics();
+	}
+
+	public boolean getCancelTask() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);
+		return cancelTask;
+	}
+
+	public boolean getReleasePartitions() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);
+		return releasePartitions;
+	}
+
+	public static TaskExecutionStateWrapper createFrom(TaskExecutionState taskExecutionState) {
+		return new TaskExecutionStateWrapper(taskExecutionState, false, false);
+	}
+
+	public static TaskExecutionStateWrapper createFromFailedState(
+			final TaskExecutionState taskExecutionState,
+			final boolean cancelTask,
+			final boolean releasePartitions) {
+
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);

Review comment:
       it is not intuitive that this method checks for a failed state, but the plain `createFrom()` does not assert the opposite (i.e., that the task did not fail).

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateWrapper.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wraps {@link TaskExecutionState}, along with actions to take
+ * if it is FAILED state.
+ */
+public class TaskExecutionStateWrapper {
+
+	private final TaskExecutionState taskExecutionState;
+
+	/**
+	 * Indicating whether to send a PRC call to remove task from TaskManager.
+	 * True if the failure is fired by JobManager and the execution is already
+	 * deployed. Otherwise it should be false.
+	 */
+	private final boolean cancelTask;
+
+	private final boolean releasePartitions;
+
+	private TaskExecutionStateWrapper(
+			final TaskExecutionState taskExecutionState,
+			final boolean cancelTask,
+			final boolean releasePartitions) {
+
+		this.taskExecutionState = checkNotNull(taskExecutionState);
+		this.cancelTask = cancelTask;
+		this.releasePartitions = releasePartitions;
+	}
+
+	public Throwable getError(ClassLoader userCodeClassloader) {
+		return taskExecutionState.getError(userCodeClassloader);
+	}
+
+	public ExecutionAttemptID getID() {
+		return taskExecutionState.getID();
+	}
+
+	public ExecutionState getExecutionState() {
+		return taskExecutionState.getExecutionState();
+	}
+
+	public JobID getJobID() {
+		return taskExecutionState.getJobID();
+	}
+
+	public AccumulatorSnapshot getAccumulators() {
+		return taskExecutionState.getAccumulators();
+	}
+
+	public IOMetrics getIOMetrics() {
+		return taskExecutionState.getIOMetrics();
+	}
+
+	public boolean getCancelTask() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);
+		return cancelTask;
+	}
+
+	public boolean getReleasePartitions() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);

Review comment:
       why should these only be queryable if the state is failed?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
##########
@@ -84,7 +85,11 @@
 
 	void handleGlobalFailure(Throwable cause);
 
-	boolean updateTaskExecutionState(TaskExecutionState taskExecutionState);
+	default boolean updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+		return updateTaskExecutionState(TaskExecutionStateWrapper.createFrom(taskExecutionState));

Review comment:
       This seems like a dangerous default, and I wouldn't be surprised if we at some point call this by accident instead of creating a failed `TaskExecutionStateWrapper`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on pull request #13766:
URL: https://github.com/apache/flink/pull/13766#issuecomment-720483567


   Thanks for reviewing @zentol 
   Merging.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13766:
URL: https://github.com/apache/flink/pull/13766#issuecomment-715255564


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 42f066384d04bcfe67ccbb5766f09ab5dde9e19c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13766:
URL: https://github.com/apache/flink/pull/13766#issuecomment-715255564


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8175",
       "triggerID" : "42f066384d04bcfe67ccbb5766f09ab5dde9e19c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b46f97dd6b1fa67b4dd169867c1967877ac8534c",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8263",
       "triggerID" : "b46f97dd6b1fa67b4dd169867c1967877ac8534c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b46f97dd6b1fa67b4dd169867c1967877ac8534c Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8263) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #13766:
URL: https://github.com/apache/flink/pull/13766#discussion_r514953283



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateWrapper.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wraps {@link TaskExecutionState}, along with actions to take
+ * if it is FAILED state.
+ */
+public class TaskExecutionStateWrapper {
+
+	private final TaskExecutionState taskExecutionState;
+
+	/**
+	 * Indicating whether to send a PRC call to remove task from TaskManager.
+	 * True if the failure is fired by JobManager and the execution is already
+	 * deployed. Otherwise it should be false.
+	 */
+	private final boolean cancelTask;
+
+	private final boolean releasePartitions;
+
+	private TaskExecutionStateWrapper(
+			final TaskExecutionState taskExecutionState,
+			final boolean cancelTask,
+			final boolean releasePartitions) {
+
+		this.taskExecutionState = checkNotNull(taskExecutionState);
+		this.cancelTask = cancelTask;
+		this.releasePartitions = releasePartitions;
+	}
+
+	public Throwable getError(ClassLoader userCodeClassloader) {
+		return taskExecutionState.getError(userCodeClassloader);
+	}
+
+	public ExecutionAttemptID getID() {
+		return taskExecutionState.getID();
+	}
+
+	public ExecutionState getExecutionState() {
+		return taskExecutionState.getExecutionState();
+	}
+
+	public JobID getJobID() {
+		return taskExecutionState.getJobID();
+	}
+
+	public AccumulatorSnapshot getAccumulators() {
+		return taskExecutionState.getAccumulators();
+	}
+
+	public IOMetrics getIOMetrics() {
+		return taskExecutionState.getIOMetrics();
+	}
+
+	public boolean getCancelTask() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);
+		return cancelTask;
+	}
+
+	public boolean getReleasePartitions() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);

Review comment:
       I will remove the check. More details see comment https://github.com/apache/flink/pull/13766#discussion_r514952083.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a change in pull request #13766: [FLINK-19703][runtime] Untrack a result partition if its producer task failed in TaskManager

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on a change in pull request #13766:
URL: https://github.com/apache/flink/pull/13766#discussion_r514959822



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateWrapper.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wraps {@link TaskExecutionState}, along with actions to take
+ * if it is FAILED state.
+ */
+public class TaskExecutionStateWrapper {
+
+	private final TaskExecutionState taskExecutionState;
+
+	/**
+	 * Indicating whether to send a PRC call to remove task from TaskManager.
+	 * True if the failure is fired by JobManager and the execution is already
+	 * deployed. Otherwise it should be false.
+	 */
+	private final boolean cancelTask;
+
+	private final boolean releasePartitions;
+
+	private TaskExecutionStateWrapper(
+			final TaskExecutionState taskExecutionState,
+			final boolean cancelTask,
+			final boolean releasePartitions) {
+
+		this.taskExecutionState = checkNotNull(taskExecutionState);
+		this.cancelTask = cancelTask;
+		this.releasePartitions = releasePartitions;
+	}
+
+	public Throwable getError(ClassLoader userCodeClassloader) {
+		return taskExecutionState.getError(userCodeClassloader);
+	}
+
+	public ExecutionAttemptID getID() {
+		return taskExecutionState.getID();
+	}
+
+	public ExecutionState getExecutionState() {
+		return taskExecutionState.getExecutionState();
+	}
+
+	public JobID getJobID() {
+		return taskExecutionState.getJobID();
+	}
+
+	public AccumulatorSnapshot getAccumulators() {
+		return taskExecutionState.getAccumulators();
+	}
+
+	public IOMetrics getIOMetrics() {
+		return taskExecutionState.getIOMetrics();
+	}
+
+	public boolean getCancelTask() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);
+		return cancelTask;
+	}
+
+	public boolean getReleasePartitions() {
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);
+		return releasePartitions;
+	}
+
+	public static TaskExecutionStateWrapper createFrom(TaskExecutionState taskExecutionState) {
+		return new TaskExecutionStateWrapper(taskExecutionState, false, false);
+	}
+
+	public static TaskExecutionStateWrapper createFromFailedState(
+			final TaskExecutionState taskExecutionState,
+			final boolean cancelTask,
+			final boolean releasePartitions) {
+
+		checkState(taskExecutionState.getExecutionState() == ExecutionState.FAILED);

Review comment:
       Just a note that `createFrom()` will also used for FAILED state which is reported from TaskManager because the flags will be false. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org