You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/26 13:31:17 UTC
[flink] 05/05: [hotfix][test] Removes unused jobId parameter
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a2c737891afde0c63c1a453b1ee164b80b6a702c
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri Feb 26 08:35:12 2021 +0100
[hotfix][test] Removes unused jobId parameter
---
.../runtime/scheduler/DefaultSchedulerTest.java | 41 +++++++---------------
1 file changed, 13 insertions(+), 28 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index ffbbe01..597d71e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -286,8 +286,7 @@ public class DefaultSchedulerTest extends TestLogger {
final ExecutionAttemptID attemptId =
archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
- scheduler.updateTaskExecutionState(
- createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+ scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
taskRestartExecutor.triggerScheduledTasks();
@@ -303,7 +302,7 @@ public class DefaultSchedulerTest extends TestLogger {
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final TaskExecutionState taskExecutionState =
- createFailedTaskExecutionState(jobGraph.getJobID(), new ExecutionAttemptID());
+ createFailedTaskExecutionState(new ExecutionAttemptID());
assertFalse(scheduler.updateTaskExecutionState(taskExecutionState));
}
@@ -324,8 +323,7 @@ public class DefaultSchedulerTest extends TestLogger {
final ExecutionAttemptID attemptId =
onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
- scheduler.updateTaskExecutionState(
- createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+ scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
taskRestartExecutor.triggerScheduledTasks();
@@ -463,8 +461,7 @@ public class DefaultSchedulerTest extends TestLogger {
.next();
final ExecutionAttemptID attemptId =
sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
- scheduler.updateTaskExecutionState(
- createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+ scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
testRestartBackoffTimeStrategy.setCanRestart(false);
testExecutionSlotAllocator.enableAutoCompletePendingRequests();
@@ -523,8 +520,7 @@ public class DefaultSchedulerTest extends TestLogger {
.getAllExecutionVertices());
final ExecutionAttemptID attemptId =
onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
- scheduler.updateTaskExecutionState(
- createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+ scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
taskRestartExecutor.triggerScheduledTasks();
@@ -701,15 +697,13 @@ public class DefaultSchedulerTest extends TestLogger {
// fail v1 and let it recover to SCHEDULED
// the initial deployment of v1 will be outdated
scheduler.updateTaskExecutionState(
- createFailedTaskExecutionState(
- jobGraph.getJobID(), v1.getCurrentExecutionAttempt().getAttemptId()));
+ createFailedTaskExecutionState(v1.getCurrentExecutionAttempt().getAttemptId()));
taskRestartExecutor.triggerScheduledTasks();
// fail v2 to get all pending slot requests in the initial deployments to be done
// this triggers the outdated deployment of v1
scheduler.updateTaskExecutionState(
- createFailedTaskExecutionState(
- jobGraph.getJobID(), v2.getCurrentExecutionAttempt().getAttemptId()));
+ createFailedTaskExecutionState(v2.getCurrentExecutionAttempt().getAttemptId()));
// v1 should not be affected
assertThat(sv1.getState(), is(equalTo(ExecutionState.SCHEDULED)));
@@ -741,8 +735,7 @@ public class DefaultSchedulerTest extends TestLogger {
checkpointTriggeredLatch.await();
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(1)));
- scheduler.updateTaskExecutionState(
- createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+ scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
taskRestartExecutor.triggerScheduledTasks();
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0)));
}
@@ -780,8 +773,7 @@ public class DefaultSchedulerTest extends TestLogger {
checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
acknowledgePendingCheckpoint(scheduler, checkpointId);
- scheduler.updateTaskExecutionState(
- createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+ scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
taskRestartExecutor.triggerScheduledTasks();
assertThat(masterHook.getRestoreCount(), is(equalTo(1)));
}
@@ -821,8 +813,7 @@ public class DefaultSchedulerTest extends TestLogger {
checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
acknowledgePendingCheckpoint(scheduler, checkpointId);
- scheduler.updateTaskExecutionState(
- createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+ scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
taskRestartExecutor.triggerScheduledTasks();
final List<ExecutionVertexID> deployedExecutionVertices =
testExecutionVertexOperations.getDeployedVertices();
@@ -888,7 +879,6 @@ public class DefaultSchedulerTest extends TestLogger {
@Test
public void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() {
final JobGraph jobGraph = singleJobVertexJobGraph(2);
- final JobID jobId = jobGraph.getJobID();
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final Iterator<ArchivedExecutionVertex> vertexIterator =
@@ -923,7 +913,6 @@ public class DefaultSchedulerTest extends TestLogger {
@Test
public void cancelWhileRestartingShouldWaitForRunningTasks() {
final JobGraph jobGraph = singleJobVertexJobGraph(2);
- final JobID jobid = jobGraph.getJobID();
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final SchedulingTopology topology = scheduler.getSchedulingTopology();
@@ -959,7 +948,6 @@ public class DefaultSchedulerTest extends TestLogger {
@Test
public void failureInfoIsSetAfterTaskFailure() {
final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
- final JobID jobId = jobGraph.getJobID();
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final ArchivedExecutionVertex onlyExecutionVertex =
@@ -985,7 +973,6 @@ public class DefaultSchedulerTest extends TestLogger {
@Test
public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception {
final JobGraph jobGraph = singleJobVertexJobGraph(2);
- final JobID jobId = jobGraph.getJobID();
testExecutionSlotAllocator.disableAutoCompletePendingRequests();
final DefaultScheduler scheduler =
@@ -1066,7 +1053,6 @@ public class DefaultSchedulerTest extends TestLogger {
@Test
public void testExceptionHistoryWithRestartableFailure() {
final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
- final JobID jobId = jobGraph.getJobID();
final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
@@ -1081,7 +1067,7 @@ public class DefaultSchedulerTest extends TestLogger {
.getAttemptId();
final RuntimeException restartableException = new RuntimeException("restartable exception");
Range<Long> updateStateTriggeringRestartTimeframe =
- initiateFailure(scheduler, jobId, restartableAttemptId, restartableException);
+ initiateFailure(scheduler, restartableAttemptId, restartableException);
taskRestartExecutor.triggerNonPeriodicScheduledTask();
@@ -1098,7 +1084,7 @@ public class DefaultSchedulerTest extends TestLogger {
.getAttemptId();
final RuntimeException failingException = new RuntimeException("failing exception");
Range<Long> updateStateTriggeringJobFailureTimeframe =
- initiateFailure(scheduler, jobId, failingAttemptId, failingException);
+ initiateFailure(scheduler, failingAttemptId, failingException);
List<ErrorInfo> actualExceptionHistory = scheduler.getExceptionHistory();
assertThat(actualExceptionHistory.size(), is(2));
@@ -1210,14 +1196,13 @@ public class DefaultSchedulerTest extends TestLogger {
}
private static TaskExecutionState createFailedTaskExecutionState(
- JobID jobId, ExecutionAttemptID executionAttemptID) {
+ ExecutionAttemptID executionAttemptID) {
return new TaskExecutionState(
executionAttemptID, ExecutionState.FAILED, new Exception("Expected failure cause"));
}
private static Range<Long> initiateFailure(
DefaultScheduler scheduler,
- JobID jobId,
ExecutionAttemptID executionAttemptID,
Throwable exception) {
long start = System.currentTimeMillis();