You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:01:03 UTC
[iotdb] 13/13: Fix DriverSchedulerTest
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2868432447cf31928acbb54c7e454a9ee08dd475
Author: Alima777 <wx...@gmail.com>
AuthorDate: Sat Feb 11 20:00:27 2023 +0800
Fix DriverSchedulerTest
---
.../main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java | 7 +------
.../java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java | 2 ++
.../apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java | 5 ++---
.../apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java | 5 +++++
.../iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java | 4 ++++
5 files changed, 14 insertions(+), 9 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index c580ab047e..71fe7ccea2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -29,11 +29,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
+import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.concurrent.GuardedBy;
-
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -105,10 +104,6 @@ public abstract class Driver implements IDriver {
/** release resource this driver used */
protected abstract void releaseResource();
- public boolean hasDependency() {
- return driverContext.getDependencyDriverIndex() != -1;
- }
-
public int getDependencyDriverIndex() {
return driverContext.getDependencyDriverIndex();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 8a55d098de..ff55d5456c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -71,4 +71,6 @@ public interface IDriver {
/** @return get SinkHandle of current IDriver */
ISinkHandle getSinkHandle();
+
+ int getDependencyDriverIndex();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index ef191ec5cb..7be42a9859 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.driver.Driver;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
@@ -189,8 +188,8 @@ public class DriverScheduler implements IDriverScheduler, IService {
List<DriverTask> submittedTasks = new ArrayList<>();
for (DriverTask task : tasks) {
- Driver driver = (Driver) task.getDriver();
- if (driver.hasDependency()) {
+ IDriver driver = task.getDriver();
+ if (driver.getDependencyDriverIndex() != -1) {
SettableFuture<?> blockedDependencyFuture =
tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
blockedDependencyFuture.addListener(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index f692451cd1..66ce5ff677 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -267,5 +267,10 @@ public class DriverTask implements IDIndexedAccessible {
public ISinkHandle getSinkHandle() {
return null;
}
+
+ @Override
+ public int getDependencyDriverIndex() {
+ return -1;
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index 7e3612e499..ff8d0fdebf 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -63,10 +63,12 @@ public class DriverSchedulerTest {
DriverTaskId driverTaskId1 = new DriverTaskId(instanceId1, 0);
IDriver mockDriver1 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1);
+ Mockito.when(mockDriver1.getDependencyDriverIndex()).thenReturn(-1);
FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId, "inst-1");
DriverTaskId driverTaskId2 = new DriverTaskId(instanceId2, 0);
IDriver mockDriver2 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2);
+ Mockito.when(mockDriver2.getDependencyDriverIndex()).thenReturn(-1);
List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
@@ -91,6 +93,7 @@ public class DriverSchedulerTest {
FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
DriverTaskId driverTaskId3 = new DriverTaskId(instanceId3, 0);
Mockito.when(mockDriver3.getDriverTaskId()).thenReturn(driverTaskId3);
+ Mockito.when(mockDriver3.getDependencyDriverIndex()).thenReturn(-1);
manager.submitDrivers(queryId, Collections.singletonList(mockDriver3), QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(1, manager.getQueryMap().size());
@@ -111,6 +114,7 @@ public class DriverSchedulerTest {
DriverTaskId driverTaskId4 = new DriverTaskId(instanceId4, 0);
IDriver mockDriver4 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver4.getDriverTaskId()).thenReturn(driverTaskId4);
+ Mockito.when(mockDriver4.getDependencyDriverIndex()).thenReturn(-1);
manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4), QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(2, manager.getQueryMap().size());