You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/26 13:31:17 UTC

[flink] 05/05: [hotfix][test] Removes unused jobId parameter

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2c737891afde0c63c1a453b1ee164b80b6a702c
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri Feb 26 08:35:12 2021 +0100

    [hotfix][test] Removes unused jobId parameter
---
 .../runtime/scheduler/DefaultSchedulerTest.java    | 41 +++++++---------------
 1 file changed, 13 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index ffbbe01..597d71e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -286,8 +286,7 @@ public class DefaultSchedulerTest extends TestLogger {
         final ExecutionAttemptID attemptId =
                 archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
 
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
 
         taskRestartExecutor.triggerScheduledTasks();
 
@@ -303,7 +302,7 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final TaskExecutionState taskExecutionState =
-                createFailedTaskExecutionState(jobGraph.getJobID(), new ExecutionAttemptID());
+                createFailedTaskExecutionState(new ExecutionAttemptID());
 
         assertFalse(scheduler.updateTaskExecutionState(taskExecutionState));
     }
@@ -324,8 +323,7 @@ public class DefaultSchedulerTest extends TestLogger {
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
 
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
 
         taskRestartExecutor.triggerScheduledTasks();
 
@@ -463,8 +461,7 @@ public class DefaultSchedulerTest extends TestLogger {
                         .next();
         final ExecutionAttemptID attemptId =
                 sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         testRestartBackoffTimeStrategy.setCanRestart(false);
 
         testExecutionSlotAllocator.enableAutoCompletePendingRequests();
@@ -523,8 +520,7 @@ public class DefaultSchedulerTest extends TestLogger {
                                 .getAllExecutionVertices());
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
 
         taskRestartExecutor.triggerScheduledTasks();
 
@@ -701,15 +697,13 @@ public class DefaultSchedulerTest extends TestLogger {
         // fail v1 and let it recover to SCHEDULED
         // the initial deployment of v1 will be outdated
         scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(
-                        jobGraph.getJobID(), v1.getCurrentExecutionAttempt().getAttemptId()));
+                createFailedTaskExecutionState(v1.getCurrentExecutionAttempt().getAttemptId()));
         taskRestartExecutor.triggerScheduledTasks();
 
         // fail v2 to get all pending slot requests in the initial deployments to be done
         // this triggers the outdated deployment of v1
         scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(
-                        jobGraph.getJobID(), v2.getCurrentExecutionAttempt().getAttemptId()));
+                createFailedTaskExecutionState(v2.getCurrentExecutionAttempt().getAttemptId()));
 
         // v1 should not be affected
         assertThat(sv1.getState(), is(equalTo(ExecutionState.SCHEDULED)));
@@ -741,8 +735,7 @@ public class DefaultSchedulerTest extends TestLogger {
         checkpointTriggeredLatch.await();
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(1)));
 
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         taskRestartExecutor.triggerScheduledTasks();
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0)));
     }
@@ -780,8 +773,7 @@ public class DefaultSchedulerTest extends TestLogger {
                 checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
         acknowledgePendingCheckpoint(scheduler, checkpointId);
 
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         taskRestartExecutor.triggerScheduledTasks();
         assertThat(masterHook.getRestoreCount(), is(equalTo(1)));
     }
@@ -821,8 +813,7 @@ public class DefaultSchedulerTest extends TestLogger {
                 checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
         acknowledgePendingCheckpoint(scheduler, checkpointId);
 
-        scheduler.updateTaskExecutionState(
-                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         taskRestartExecutor.triggerScheduledTasks();
         final List<ExecutionVertexID> deployedExecutionVertices =
                 testExecutionVertexOperations.getDeployedVertices();
@@ -888,7 +879,6 @@ public class DefaultSchedulerTest extends TestLogger {
     @Test
     public void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
-        final JobID jobId = jobGraph.getJobID();
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final Iterator<ArchivedExecutionVertex> vertexIterator =
@@ -923,7 +913,6 @@ public class DefaultSchedulerTest extends TestLogger {
     @Test
     public void cancelWhileRestartingShouldWaitForRunningTasks() {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
-        final JobID jobid = jobGraph.getJobID();
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
         final SchedulingTopology topology = scheduler.getSchedulingTopology();
 
@@ -959,7 +948,6 @@ public class DefaultSchedulerTest extends TestLogger {
     @Test
     public void failureInfoIsSetAfterTaskFailure() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
-        final JobID jobId = jobGraph.getJobID();
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final ArchivedExecutionVertex onlyExecutionVertex =
@@ -985,7 +973,6 @@ public class DefaultSchedulerTest extends TestLogger {
     @Test
     public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
-        final JobID jobId = jobGraph.getJobID();
         testExecutionSlotAllocator.disableAutoCompletePendingRequests();
 
         final DefaultScheduler scheduler =
@@ -1066,7 +1053,6 @@ public class DefaultSchedulerTest extends TestLogger {
     @Test
     public void testExceptionHistoryWithRestartableFailure() {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
-        final JobID jobId = jobGraph.getJobID();
 
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
@@ -1081,7 +1067,7 @@ public class DefaultSchedulerTest extends TestLogger {
                         .getAttemptId();
         final RuntimeException restartableException = new RuntimeException("restartable exception");
         Range<Long> updateStateTriggeringRestartTimeframe =
-                initiateFailure(scheduler, jobId, restartableAttemptId, restartableException);
+                initiateFailure(scheduler, restartableAttemptId, restartableException);
 
         taskRestartExecutor.triggerNonPeriodicScheduledTask();
 
@@ -1098,7 +1084,7 @@ public class DefaultSchedulerTest extends TestLogger {
                         .getAttemptId();
         final RuntimeException failingException = new RuntimeException("failing exception");
         Range<Long> updateStateTriggeringJobFailureTimeframe =
-                initiateFailure(scheduler, jobId, failingAttemptId, failingException);
+                initiateFailure(scheduler, failingAttemptId, failingException);
 
         List<ErrorInfo> actualExceptionHistory = scheduler.getExceptionHistory();
         assertThat(actualExceptionHistory.size(), is(2));
@@ -1210,14 +1196,13 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     private static TaskExecutionState createFailedTaskExecutionState(
-            JobID jobId, ExecutionAttemptID executionAttemptID) {
+            ExecutionAttemptID executionAttemptID) {
         return new TaskExecutionState(
                 executionAttemptID, ExecutionState.FAILED, new Exception("Expected failure cause"));
     }
 
     private static Range<Long> initiateFailure(
             DefaultScheduler scheduler,
-            JobID jobId,
             ExecutionAttemptID executionAttemptID,
             Throwable exception) {
         long start = System.currentTimeMillis();