You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/04/19 09:26:10 UTC

[iotdb] branch feature/mpp-sche-clean created (now e61bffb4d2)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a change to branch feature/mpp-sche-clean
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at e61bffb4d2 Add Driver.failed() call in FragmentInstanceScheduler

This branch includes the following new commits:

     new e61bffb4d2 Add Driver.failed() call in FragmentInstanceScheduler

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Add Driver.failed() call in FragmentInstanceScheduler

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch feature/mpp-sche-clean
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e61bffb4d24c7c6377250c1cce4579bdc932432c
Author: ericpai <er...@hotmail.com>
AuthorDate: Tue Apr 19 17:25:52 2022 +0800

    Add Driver.failed() call in FragmentInstanceScheduler
---
 .../schedule/FragmentInstanceAbortedException.java | 35 ++++++++++++++
 .../db/mpp/schedule/FragmentInstanceScheduler.java |  9 ++++
 .../mpp/schedule/FragmentInstanceTaskExecutor.java |  1 +
 .../schedule/FragmentInstanceTimeoutSentinel.java  |  1 +
 .../db/mpp/schedule/task/FragmentInstanceTask.java | 10 ++++
 .../db/mpp/schedule/DefaultTaskSchedulerTest.java  | 18 +++++++
 .../schedule/FragmentInstanceSchedulerTest.java    | 20 ++++++++
 .../FragmentInstanceTimeoutSentinelTest.java       | 55 +++++++++++++---------
 8 files changed, 126 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
new file mode 100644
index 0000000000..20017340f6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.schedule;
+
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.Driver;
+
+/** A common exception to pass to {@link Driver#failed(Throwable)} */
+public class FragmentInstanceAbortedException extends Exception {
+
+  public static final String BY_TIMEOUT = "timeout";
+  public static final String BY_FRAGMENT_ABORT_CALLED = "fragment abort called";
+  public static final String BY_QUERY_CASCADING_ABORTED = "query cascading aborted";
+  public static final String BY_ALREADY_BEING_CANCELLED = "already being cancelled";
+
+  public FragmentInstanceAbortedException(FragmentInstanceId id, String causeMsg) {
+    super(String.format("FragmentInstance %s is aborted by %s", id.toString(), causeMsg));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index c7bdb95285..5c25e7e129 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -154,6 +154,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
       for (FragmentInstanceTask task : queryRelatedTasks) {
         task.lock();
         try {
+          task.setAbortCause(FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED);
           clearFragmentInstanceTask(task);
         } finally {
           task.unlock();
@@ -170,6 +171,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
     }
     task.lock();
     try {
+      task.setAbortCause(FragmentInstanceAbortedException.BY_FRAGMENT_ABORT_CALLED);
       clearFragmentInstanceTask(task);
     } finally {
       task.unlock();
@@ -190,6 +192,12 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
     if (task.getStatus() != FragmentInstanceTaskStatus.FINISHED) {
       task.setStatus(FragmentInstanceTaskStatus.ABORTED);
     }
+    if (task.getAbortCause() != null) {
+      task.getFragmentInstance()
+          .failed(
+              new FragmentInstanceAbortedException(
+                  task.getFragmentInstance().getInfo(), task.getAbortCause()));
+    }
     if (task.getStatus() == FragmentInstanceTaskStatus.ABORTED) {
       blockManager.forceDeregisterFragmentInstance(
           new TFragmentInstanceId(
@@ -345,6 +353,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
           }
           otherTask.lock();
           try {
+            otherTask.setAbortCause(FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED);
             clearFragmentInstanceTask(otherTask);
           } finally {
             otherTask.unlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
index ed704c4ca4..fd19b67ee0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
@@ -59,6 +59,7 @@ public class FragmentInstanceTaskExecutor extends AbstractExecutor {
     // long cost = System.nanoTime() - startTime;
     // If the future is cancelled, the task is in an error and should be thrown.
     if (future.isCancelled()) {
+      task.setAbortCause(FragmentInstanceAbortedException.BY_ALREADY_BEING_CANCELLED);
       scheduler.toAborted(task);
       return;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
index c1327a3db7..e7aaaf4e47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
@@ -50,6 +50,7 @@ public class FragmentInstanceTimeoutSentinel extends AbstractExecutor {
       // After this time, the task must be timeout.
       Thread.sleep(waitTime);
     }
+    task.setAbortCause(FragmentInstanceAbortedException.BY_TIMEOUT);
     scheduler.toAborted(task);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
index e30b1f15bc..abebdaf30d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
@@ -53,6 +53,8 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
   // Running stats
   private long cpuWallNano;
 
+  private String abortCause;
+
   /** Initialize a dummy instance for queryHolder */
   public FragmentInstanceTask() {
     this(new StubFragmentInstance(), 0L, null);
@@ -139,6 +141,14 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
     return o instanceof FragmentInstanceTask && ((FragmentInstanceTask) o).getId().equals(id);
   }
 
+  public String getAbortCause() {
+    return abortCause;
+  }
+
+  public void setAbortCause(String abortCause) {
+    this.abortCause = abortCause;
+  }
+
   /** a comparator of ddl, the less the ddl is, the low order it has. */
   public static class TimeoutComparator implements Comparator<FragmentInstanceTask> {
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
index e73529aa06..82ca5dfd2c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
@@ -94,6 +94,7 @@ public class DefaultTaskSchedulerTest {
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
     clear();
   }
 
@@ -141,6 +142,7 @@ public class DefaultTaskSchedulerTest {
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
     clear();
   }
 
@@ -193,6 +195,7 @@ public class DefaultTaskSchedulerTest {
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
     clear();
   }
 
@@ -245,6 +248,7 @@ public class DefaultTaskSchedulerTest {
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
     clear();
   }
 
@@ -296,6 +300,7 @@ public class DefaultTaskSchedulerTest {
     Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
     Assert.assertNull(manager.getTimeoutQueue().get(testTask.getId()));
     Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
     clear();
   }
 
@@ -342,6 +347,9 @@ public class DefaultTaskSchedulerTest {
       Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
       Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask1));
       Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask2));
+
+      Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+      Mockito.verify(mockDriver2, Mockito.never()).failed(Mockito.any());
       clear();
     }
     FragmentInstanceTaskStatus[] validStates =
@@ -351,6 +359,11 @@ public class DefaultTaskSchedulerTest {
           FragmentInstanceTaskStatus.BLOCKED,
         };
     for (FragmentInstanceTaskStatus status : validStates) {
+      Mockito.reset(mockDriver1);
+      Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
+      Mockito.reset(mockDriver2);
+      Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
+
       FragmentInstanceTask testTask1 = new FragmentInstanceTask(mockDriver1, 100L, status);
 
       FragmentInstanceTask testTask2 =
@@ -377,6 +390,11 @@ public class DefaultTaskSchedulerTest {
       Assert.assertNull(manager.getTimeoutQueue().get(testTask1.getId()));
       Assert.assertNull(manager.getTimeoutQueue().get(testTask2.getId()));
       Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
+
+      // The mockDriver1.failed() will be called outside the scheduler
+      Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+      Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
+
       clear();
     }
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
index 233908ca44..56435496ab 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
@@ -117,6 +117,8 @@ public class FragmentInstanceSchedulerTest {
     Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
 
     // Abort one FragmentInstance
+    Mockito.reset(mockDriver1);
+    Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
     manager.abortFragmentInstance(instanceId1);
     Mockito.verify(mockDataBlockManager, Mockito.times(1))
         .forceDeregisterFragmentInstance(Mockito.any());
@@ -129,9 +131,18 @@ public class FragmentInstanceSchedulerTest {
     Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus());
     Assert.assertEquals(FragmentInstanceTaskStatus.READY, task3.getStatus());
     Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+    Mockito.verify(mockDriver1, Mockito.times(1)).failed(Mockito.any());
+    Assert.assertEquals(
+        FragmentInstanceAbortedException.BY_FRAGMENT_ABORT_CALLED, task1.getAbortCause());
 
     // Abort the whole query
     Mockito.reset(mockDataBlockManager);
+    Mockito.reset(mockDriver1);
+    Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
+    Mockito.reset(mockDriver2);
+    Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
+    Mockito.reset(mockDriver3);
+    Mockito.when(mockDriver3.getInfo()).thenReturn(instanceId3);
     manager.abortQuery(queryId);
     Mockito.verify(mockDataBlockManager, Mockito.times(2))
         .forceDeregisterFragmentInstance(Mockito.any());
@@ -144,5 +155,14 @@ public class FragmentInstanceSchedulerTest {
     Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task2.getStatus());
     Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task3.getStatus());
     Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+    Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+    Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
+    Mockito.verify(mockDriver3, Mockito.times(1)).failed(Mockito.any());
+    Mockito.verify(mockDriver4, Mockito.never()).failed(Mockito.any());
+    Assert.assertEquals(
+        FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED, task2.getAbortCause());
+    Assert.assertEquals(
+        FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED, task3.getAbortCause());
+    Assert.assertNull(task4.getAbortCause());
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
index 87d1de0870..862f4ca207 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
@@ -69,29 +69,33 @@ public class FragmentInstanceTimeoutSentinelTest {
         new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.FINISHED);
     executor.execute(testTask);
     Assert.assertEquals(FragmentInstanceTaskStatus.FINISHED, testTask.getStatus());
-    Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
 
     // ABORTED status test
     testTask = new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.ABORTED);
     executor.execute(testTask);
     Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, testTask.getStatus());
-    Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
 
     // RUNNING status test
     testTask = new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.RUNNING);
     executor.execute(testTask);
     Assert.assertEquals(FragmentInstanceTaskStatus.RUNNING, testTask.getStatus());
-    Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
 
     // BLOCKED status test
     testTask = new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.BLOCKED);
     executor.execute(testTask);
     Assert.assertEquals(FragmentInstanceTaskStatus.BLOCKED, testTask.getStatus());
-    Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).blockedToReady(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
+    Assert.assertNull(testTask.getAbortCause());
+    Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
   }
 
   @Test
@@ -127,11 +131,13 @@ public class FragmentInstanceTimeoutSentinelTest {
         new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
+    Assert.assertEquals(
+        FragmentInstanceAbortedException.BY_ALREADY_BEING_CANCELLED, testTask.getAbortCause());
     Mockito.verify(mockScheduler, Mockito.times(1)).toAborted(Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).runningToReady(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).blockedToReady(Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
   }
 
   @Test
@@ -166,11 +172,12 @@ public class FragmentInstanceTimeoutSentinelTest {
         new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).runningToReady(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any());
+    Assert.assertNull(testTask.getAbortCause());
+    Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
     Mockito.verify(mockScheduler, Mockito.times(1)).runningToFinished(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).blockedToReady(Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
   }
 
   @Test
@@ -216,10 +223,11 @@ public class FragmentInstanceTimeoutSentinelTest {
         new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).runningToReady(Mockito.any(), Mockito.any());
+    Assert.assertNull(testTask.getAbortCause());
+    Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
     Mockito.verify(mockScheduler, Mockito.times(1)).runningToBlocked(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
     Mockito.verify(mockScheduler, Mockito.times(1)).blockedToReady(Mockito.any());
   }
 
@@ -266,10 +274,11 @@ public class FragmentInstanceTimeoutSentinelTest {
         new FragmentInstanceTask(mockDriver, 100L, FragmentInstanceTaskStatus.READY);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any());
+    Assert.assertNull(testTask.getAbortCause());
+    Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
     Mockito.verify(mockScheduler, Mockito.times(1)).runningToReady(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).blockedToReady(Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, Mockito.never()).blockedToReady(Mockito.any());
   }
 }