You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:01:03 UTC

[iotdb] 13/13: Fix DriverSchedulerTest

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2868432447cf31928acbb54c7e454a9ee08dd475
Author: Alima777 <wx...@gmail.com>
AuthorDate: Sat Feb 11 20:00:27 2023 +0800

    Fix DriverSchedulerTest
---
 .../main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java | 7 +------
 .../java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java     | 2 ++
 .../apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java    | 5 ++---
 .../apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java    | 5 +++++
 .../iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java       | 4 ++++
 5 files changed, 14 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index c580ab047e..71fe7ccea2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -29,11 +29,10 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import io.airlift.units.Duration;
+import javax.annotation.concurrent.GuardedBy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.GuardedBy;
-
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -105,10 +104,6 @@ public abstract class Driver implements IDriver {
   /** release resource this driver used */
   protected abstract void releaseResource();
 
-  public boolean hasDependency() {
-    return driverContext.getDependencyDriverIndex() != -1;
-  }
-
   public int getDependencyDriverIndex() {
     return driverContext.getDependencyDriverIndex();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 8a55d098de..ff55d5456c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -71,4 +71,6 @@ public interface IDriver {
 
   /** @return get SinkHandle of current IDriver */
   ISinkHandle getSinkHandle();
+
+  int getDependencyDriverIndex();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index ef191ec5cb..7be42a9859 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.driver.Driver;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
@@ -189,8 +188,8 @@ public class DriverScheduler implements IDriverScheduler, IService {
 
     List<DriverTask> submittedTasks = new ArrayList<>();
     for (DriverTask task : tasks) {
-      Driver driver = (Driver) task.getDriver();
-      if (driver.hasDependency()) {
+      IDriver driver = task.getDriver();
+      if (driver.getDependencyDriverIndex() != -1) {
         SettableFuture<?> blockedDependencyFuture =
             tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
         blockedDependencyFuture.addListener(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index f692451cd1..66ce5ff677 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -267,5 +267,10 @@ public class DriverTask implements IDIndexedAccessible {
     public ISinkHandle getSinkHandle() {
       return null;
     }
+
+    @Override
+    public int getDependencyDriverIndex() {
+      return -1;
+    }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index 7e3612e499..ff8d0fdebf 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -63,10 +63,12 @@ public class DriverSchedulerTest {
     DriverTaskId driverTaskId1 = new DriverTaskId(instanceId1, 0);
     IDriver mockDriver1 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1);
+    Mockito.when(mockDriver1.getDependencyDriverIndex()).thenReturn(-1);
     FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId, "inst-1");
     DriverTaskId driverTaskId2 = new DriverTaskId(instanceId2, 0);
     IDriver mockDriver2 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2);
+    Mockito.when(mockDriver2.getDependencyDriverIndex()).thenReturn(-1);
     List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
     manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
@@ -91,6 +93,7 @@ public class DriverSchedulerTest {
     FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
     DriverTaskId driverTaskId3 = new DriverTaskId(instanceId3, 0);
     Mockito.when(mockDriver3.getDriverTaskId()).thenReturn(driverTaskId3);
+    Mockito.when(mockDriver3.getDependencyDriverIndex()).thenReturn(-1);
     manager.submitDrivers(queryId, Collections.singletonList(mockDriver3), QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(1, manager.getQueryMap().size());
@@ -111,6 +114,7 @@ public class DriverSchedulerTest {
     DriverTaskId driverTaskId4 = new DriverTaskId(instanceId4, 0);
     IDriver mockDriver4 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver4.getDriverTaskId()).thenReturn(driverTaskId4);
+    Mockito.when(mockDriver4.getDependencyDriverIndex()).thenReturn(-1);
     manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4), QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(2, manager.getQueryMap().size());