You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/02 03:40:40 UTC
[iotdb] branch ty-mpp updated: add DataDriverTest
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty-mpp by this push:
new f3698d8 add DataDriverTest
f3698d8 is described below
commit f3698d899f8a511354ac142773ba0cc6a4423f3e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sat Apr 2 11:39:47 2022 +0800
add DataDriverTest
---
.../apache/iotdb/db/mpp/buffer/StubSinkHandle.java | 67 ++++++++++++
.../apache/iotdb/db/mpp/execution/DataDriver.java | 47 +++++----
.../iotdb/db/mpp/execution/DriverContext.java | 8 ++
.../db/mpp/execution/FragmentInstanceContext.java | 29 +++++-
.../mpp/execution/FragmentInstanceExecution.java | 24 +++--
.../db/mpp/execution/FragmentInstanceManager.java | 19 +++-
.../iotdb/db/mpp/execution/SchemaDriver.java | 8 +-
.../org/apache/iotdb/db/mpp/operator/Operator.java | 1 +
.../mpp/schedule/FragmentInstanceTaskExecutor.java | 2 +-
.../DataDriverTest.java} | 116 ++++++++++++++-------
.../iotdb/db/mpp/operator/LimitOperatorTest.java | 6 +-
.../db/mpp/operator/SeriesScanOperatorTest.java | 6 +-
.../db/mpp/operator/TimeJoinOperatorTest.java | 6 +-
...est.java => FragmentInstanceSchedulerTest.java} | 2 +-
14 files changed, 262 insertions(+), 79 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
new file mode 100644
index 0000000..b920eb0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.buffer;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+
+public class StubSinkHandle implements ISinkHandle {
+
+ private final ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture();
+
+ private final List<TsBlock> tsBlocks = new ArrayList<>();
+
+ @Override
+ public ListenableFuture<Void> isFull() {
+ return NOT_BLOCKED;
+ }
+
+ @Override
+ public void send(TsBlock tsBlock) {
+ tsBlocks.add(tsBlock);
+ }
+
+ @Override
+ public void send(int partition, TsBlock tsBlock) {
+ tsBlocks.add(tsBlock);
+ }
+
+ @Override
+ public void setNoMoreTsBlocks() {}
+
+ @Override
+ public void close() {
+ tsBlocks.clear();
+ }
+
+ @Override
+ public void abort() {
+ tsBlocks.clear();
+ }
+
+ public List<TsBlock> getTsBlocks() {
+ return tsBlocks;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 4d96322..0900fa6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -57,7 +58,7 @@ public class DataDriver implements ExecFragmentInstance {
private final Operator root;
private final ISinkHandle sinkHandle;
- private final DataDriverContext dataDriverContext;
+ private final DataDriverContext driverContext;
private boolean init;
private boolean closed;
@@ -69,10 +70,12 @@ public class DataDriver implements ExecFragmentInstance {
private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
- public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext dataDriverContext) {
+ public DataDriver(Operator root, ISinkHandle sinkHandle, DataDriverContext driverContext) {
this.root = root;
this.sinkHandle = sinkHandle;
- this.dataDriverContext = dataDriverContext;
+ this.driverContext = driverContext;
+ this.closedFilePaths = new HashSet<>();
+ this.unClosedFilePaths = new HashSet<>();
// initially the driverBlockedFuture is not blocked (it is completed)
SettableFuture<Void> future = SettableFuture.create();
future.set(null);
@@ -81,14 +84,17 @@ public class DataDriver implements ExecFragmentInstance {
@Override
public boolean isFinished() {
- if (closed) {
- return true;
- }
try {
- return root != null && root.isFinished();
+ boolean isFinished =
+ closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished());
+ if (isFinished) {
+ driverContext.finish();
+ }
+ return isFinished;
} catch (Throwable t) {
logger.error(
- "Failed to query whether the data driver {} is finished", dataDriverContext.getId(), t);
+ "Failed to query whether the data driver {} is finished", driverContext.getId(), t);
+ driverContext.failed(t);
close();
return true;
}
@@ -105,9 +111,8 @@ public class DataDriver implements ExecFragmentInstance {
initialize();
} catch (Throwable t) {
logger.error(
- "Failed to do the initialization for fragment instance {} ",
- dataDriverContext.getId(),
- t);
+ "Failed to do the initialization for fragment instance {} ", driverContext.getId(), t);
+ driverContext.failed(t);
close();
return NOT_BLOCKED;
}
@@ -130,7 +135,8 @@ public class DataDriver implements ExecFragmentInstance {
}
} while (System.nanoTime() - start < maxRuntime && !root.isFinished());
} catch (Throwable t) {
- logger.error("Failed to execute fragment instance {}", dataDriverContext.getId(), t);
+ logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
+ driverContext.failed(t);
close();
}
return NOT_BLOCKED;
@@ -138,7 +144,7 @@ public class DataDriver implements ExecFragmentInstance {
@Override
public FragmentInstanceId getInfo() {
- return dataDriverContext.getId();
+ return driverContext.getId();
}
@Override
@@ -159,7 +165,8 @@ public class DataDriver implements ExecFragmentInstance {
sinkHandle.close();
}
} catch (Throwable t) {
- logger.error("Failed to closed driver {}", dataDriverContext.getId(), t);
+ logger.error("Failed to closed driver {}", driverContext.getId(), t);
+ driverContext.failed(t);
} finally {
removeUsedFilesForQuery();
}
@@ -170,7 +177,7 @@ public class DataDriver implements ExecFragmentInstance {
* we should change all the blocked lock operation into tryLock
*/
private void initialize() throws QueryProcessException {
- List<SourceOperator> sourceOperators = dataDriverContext.getSourceOperators();
+ List<SourceOperator> sourceOperators = driverContext.getSourceOperators();
if (sourceOperators != null && !sourceOperators.isEmpty()) {
QueryDataSource dataSource = initQueryDataSourceCache();
sourceOperators.forEach(
@@ -193,11 +200,11 @@ public class DataDriver implements ExecFragmentInstance {
* QueryDataSource needed for this query
*/
public QueryDataSource initQueryDataSourceCache() throws QueryProcessException {
- VirtualStorageGroupProcessor dataRegion = dataDriverContext.getDataRegion();
+ VirtualStorageGroupProcessor dataRegion = driverContext.getDataRegion();
dataRegion.readLock();
try {
List<PartialPath> pathList =
- dataDriverContext.getPaths().stream()
+ driverContext.getPaths().stream()
.map(IDTable::translateQueryPath)
.collect(Collectors.toList());
// when all the selected series are under the same device, the QueryDataSource will be
@@ -209,8 +216,8 @@ public class DataDriver implements ExecFragmentInstance {
dataRegion.query(
pathList,
selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
- dataDriverContext.getFragmentInstanceContext(),
- dataDriverContext.getTimeFilter());
+ driverContext.getFragmentInstanceContext(),
+ driverContext.getTimeFilter());
// used files should be added before mergeLock is unlocked, or they may be deleted by
// running merge
@@ -218,7 +225,7 @@ public class DataDriver implements ExecFragmentInstance {
return dataSource;
} finally {
- dataDriverContext.getDataRegion().readUnlock();
+ dataRegion.readUnlock();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index fddc3a5..2168c24 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -35,4 +35,12 @@ public class DriverContext {
public FragmentInstanceContext getFragmentInstanceContext() {
return fragmentInstanceContext;
}
+
+ public void failed(Throwable cause) {
+ fragmentInstanceContext.failed(cause);
+ }
+
+ public void finish() {
+ fragmentInstanceContext.finish();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index fc3c82d..550a293 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -25,12 +25,13 @@ import org.apache.iotdb.db.query.context.QueryContext;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkArgument;
public class FragmentInstanceContext extends QueryContext {
- private FragmentInstanceId id;
+ private final FragmentInstanceId id;
// TODO if we split one fragment instance into multiple pipelines to run, we need to replace it
// with CopyOnWriteArrayList or some other thread safe data structure
@@ -39,6 +40,9 @@ public class FragmentInstanceContext extends QueryContext {
private DriverContext driverContext;
+ // TODO we may use StateMachine<FragmentInstanceState> to replace it
+ private final AtomicReference<FragmentInstanceState> state;
+
// private final GcMonitor gcMonitor;
// private final AtomicLong startNanos = new AtomicLong();
// private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -47,8 +51,10 @@ public class FragmentInstanceContext extends QueryContext {
// private final AtomicLong endFullGcCount = new AtomicLong(-1);
// private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
- public FragmentInstanceContext(FragmentInstanceId id) {
+ public FragmentInstanceContext(
+ FragmentInstanceId id, AtomicReference<FragmentInstanceState> state) {
this.id = id;
+ this.state = state;
}
public OperatorContext addOperatorContext(
@@ -83,4 +89,23 @@ public class FragmentInstanceContext extends QueryContext {
public void setDriverContext(DriverContext driverContext) {
this.driverContext = driverContext;
}
+
+ public void failed(Throwable cause) {
+ state.set(FragmentInstanceState.FAILED);
+ }
+
+ public void cancel() {
+ state.set(FragmentInstanceState.CANCELED);
+ }
+
+ public void abort() {
+ state.set(FragmentInstanceState.ABORTED);
+ }
+
+ public void finish() {
+ if (state.get().isDone()) {
+ return;
+ }
+ state.set(FragmentInstanceState.FINISHED);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
index 1791e3f..4bfb458 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
import com.google.common.collect.ImmutableList;
+import java.util.concurrent.atomic.AtomicReference;
+
import static java.util.Objects.requireNonNull;
public class FragmentInstanceExecution {
@@ -34,7 +36,8 @@ public class FragmentInstanceExecution {
private final ExecFragmentInstance driver;
- private FragmentInstanceState state;
+ // TODO we may use StateMachine<FragmentInstanceState> to replace it
+ private final AtomicReference<FragmentInstanceState> state;
private long lastHeartbeat;
@@ -42,11 +45,14 @@ public class FragmentInstanceExecution {
IFragmentInstanceScheduler scheduler,
FragmentInstanceId instanceId,
FragmentInstanceContext context,
- ExecFragmentInstance driver) {
+ ExecFragmentInstance driver,
+ AtomicReference<FragmentInstanceState> state) {
this.scheduler = scheduler;
this.instanceId = instanceId;
this.context = context;
this.driver = driver;
+ this.state = state;
+ state.set(FragmentInstanceState.RUNNING);
scheduler.submitFragmentInstances(instanceId.getQueryId(), ImmutableList.of(driver));
}
@@ -59,27 +65,23 @@ public class FragmentInstanceExecution {
}
public FragmentInstanceState getInstanceState() {
- return state;
- }
-
- public void setState(FragmentInstanceState state) {
- this.state = state;
+ return state.get();
}
public FragmentInstanceInfo getInstanceInfo() {
- return new FragmentInstanceInfo(state);
+ return new FragmentInstanceInfo(state.get());
}
public void failed(Throwable cause) {
requireNonNull(cause, "cause is null");
- // TODO
+ context.failed(cause);
}
public void cancel() {
- // TODO
+ context.cancel();
}
public void abort() {
- // TODO
+ context.abort();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 91a31c6..4e9c61b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
import static java.util.Objects.requireNonNull;
@@ -55,8 +56,13 @@ public class FragmentInstanceManager {
instanceExecution.computeIfAbsent(
instanceId,
id -> {
+ AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
+ state.set(FragmentInstanceState.PLANNED);
+
FragmentInstanceContext context =
- instanceContext.computeIfAbsent(instanceId, FragmentInstanceContext::new);
+ instanceContext.computeIfAbsent(
+ instanceId,
+ fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
DataDriver driver =
planner.plan(
@@ -64,7 +70,7 @@ public class FragmentInstanceManager {
context,
instance.getTimeFilter(),
dataRegion);
- return new FragmentInstanceExecution(scheduler, instanceId, context, driver);
+ return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
});
return execution.getInstanceInfo();
@@ -78,12 +84,17 @@ public class FragmentInstanceManager {
instanceExecution.computeIfAbsent(
instanceId,
id -> {
+ AtomicReference<FragmentInstanceState> state = new AtomicReference<>();
+ state.set(FragmentInstanceState.PLANNED);
+
FragmentInstanceContext context =
- instanceContext.computeIfAbsent(instanceId, FragmentInstanceContext::new);
+ instanceContext.computeIfAbsent(
+ instanceId,
+ fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
SchemaDriver driver =
planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
- return new FragmentInstanceExecution(scheduler, instanceId, context, driver);
+ return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
});
return execution.getInstanceInfo();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index efb5484..844381b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -62,10 +62,15 @@ public class SchemaDriver implements ExecFragmentInstance {
@Override
public boolean isFinished() {
try {
- return root != null && root.isFinished();
+ boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished();
+ if (isFinished) {
+ driverContext.finish();
+ }
+ return isFinished;
} catch (Throwable t) {
logger.error(
"Failed to query whether the schema driver {} is finished", driverContext.getId(), t);
+ driverContext.failed(t);
return true;
}
}
@@ -90,6 +95,7 @@ public class SchemaDriver implements ExecFragmentInstance {
} while (System.nanoTime() - start < maxRuntime && !root.isFinished());
} catch (Throwable t) {
logger.error("Failed to execute fragment instance {}", driverContext.getId(), t);
+ driverContext.failed(t);
close();
}
return NOT_BLOCKED;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index 0821424..af18fb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
public interface Operator extends AutoCloseable {
+
ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture();
OperatorContext getOperatorContext();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
index db9cee9..7f0ac66 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
@@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
/** the worker thread of {@link FragmentInstanceTask} */
public class FragmentInstanceTaskExecutor extends AbstractExecutor {
- private static final Duration EXECUTION_TIME_SLICE = new Duration(100, TimeUnit.MILLISECONDS);
+ public static final Duration EXECUTION_TIME_SLICE = new Duration(100, TimeUnit.MILLISECONDS);
// As the callback is lightweight enough, there's no need to use another one thread to execute.
private static final Executor listeningExecutor = MoreExecutors.directExecutor();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
similarity index 57%
copy from server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
copy to server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index e901ab6..c5f621b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -16,17 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.operator;
+package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.buffer.StubSinkHandle;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
@@ -39,9 +42,12 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
@@ -49,14 +55,17 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor.EXECUTION_TIME_SLICE;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class LimitOperatorTest {
+public class DataDriverTest {
- private static final String TIME_JOIN_OPERATOR_TEST_SG = "root.LimitOperatorTest";
+ private static final String DATA_DRIVER_TEST_SG = "root.DataDriverTest";
private final List<String> deviceIds = new ArrayList<>();
private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
@@ -66,7 +75,7 @@ public class LimitOperatorTest {
@Before
public void setUp() throws MetadataException, IOException, WriteProcessException {
SeriesReaderTestUtil.setUp(
- measurementSchemas, deviceIds, seqResources, unSeqResources, TIME_JOIN_OPERATOR_TEST_SG);
+ measurementSchemas, deviceIds, seqResources, unSeqResources, DATA_DRIVER_TEST_SG);
}
@After
@@ -78,14 +87,16 @@ public class LimitOperatorTest {
public void batchTest() {
try {
MeasurementPath measurementPath1 =
- new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+ new MeasurementPath(DATA_DRIVER_TEST_SG + ".device0.sensor0", TSDataType.INT32);
Set<String> allSensors = new HashSet<>();
allSensors.add("sensor0");
allSensors.add("sensor1");
QueryId queryId = new QueryId("stub_query");
+ AtomicReference<FragmentInstanceState> state =
+ new AtomicReference<>(FragmentInstanceState.RUNNING);
FragmentInstanceContext fragmentInstanceContext =
new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
fragmentInstanceContext.addOperatorContext(
1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
fragmentInstanceContext.addOperatorContext(
@@ -103,10 +114,9 @@ public class LimitOperatorTest {
null,
null,
true);
- seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
MeasurementPath measurementPath2 =
- new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32);
+ new MeasurementPath(DATA_DRIVER_TEST_SG + ".device0.sensor1", TSDataType.INT32);
SeriesScanOperator seriesScanOperator2 =
new SeriesScanOperator(
measurementPath2,
@@ -116,7 +126,6 @@ public class LimitOperatorTest {
null,
null,
true);
- seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
TimeJoinOperator timeJoinOperator =
new TimeJoinOperator(
@@ -128,37 +137,72 @@ public class LimitOperatorTest {
LimitOperator limitOperator =
new LimitOperator(
fragmentInstanceContext.getOperatorContexts().get(3), 250, timeJoinOperator);
- int count = 0;
- while (limitOperator.hasNext()) {
- TsBlock tsBlock = limitOperator.next();
- assertEquals(2, tsBlock.getValueColumnCount());
- assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
- assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
- if (count < 12) {
- assertEquals(20, tsBlock.getPositionCount());
- } else {
- assertEquals(10, tsBlock.getPositionCount());
+
+ VirtualStorageGroupProcessor dataRegion = Mockito.mock(VirtualStorageGroupProcessor.class);
+
+ List<PartialPath> pathList = ImmutableList.of(measurementPath1, measurementPath2);
+ String deviceId = DATA_DRIVER_TEST_SG + ".device0";
+
+ Mockito.when(dataRegion.query(pathList, deviceId, fragmentInstanceContext, null))
+ .thenReturn(new QueryDataSource(seqResources, unSeqResources));
+
+ DataDriverContext driverContext =
+ new DataDriverContext(
+ fragmentInstanceContext,
+ pathList,
+ null,
+ dataRegion,
+ ImmutableList.of(seriesScanOperator1, seriesScanOperator2));
+
+ StubSinkHandle sinkHandle = new StubSinkHandle();
+
+ try (ExecFragmentInstance dataDriver =
+ new DataDriver(limitOperator, sinkHandle, driverContext)) {
+ assertEquals(fragmentInstanceContext.getId(), dataDriver.getInfo());
+
+ assertFalse(dataDriver.isFinished());
+
+ while (!dataDriver.isFinished()) {
+ assertEquals(FragmentInstanceState.RUNNING, state.get());
+ ListenableFuture<Void> blocked = dataDriver.processFor(EXECUTION_TIME_SLICE);
+ assertTrue(blocked.isDone());
}
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- long expectedTime = i + 20L * count;
- assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
- if (expectedTime < 200) {
- assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i));
- assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
- } else if (expectedTime < 260
- || (expectedTime >= 300 && expectedTime < 380)
- || expectedTime >= 400) {
- assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i));
- assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+
+ assertEquals(FragmentInstanceState.FINISHED, state.get());
+
+ List<TsBlock> result = sinkHandle.getTsBlocks();
+ assertEquals(13, result.size());
+
+ for (int i = 0; i < 13; i++) {
+ TsBlock tsBlock = result.get(i);
+ assertEquals(2, tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+
+ if (i < 12) {
+ assertEquals(20, tsBlock.getPositionCount());
} else {
- assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i));
- assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+ assertEquals(10, tsBlock.getPositionCount());
+ }
+ for (int j = 0; j < tsBlock.getPositionCount(); j++) {
+ long expectedTime = j + 20L * i;
+ assertEquals(expectedTime, tsBlock.getTimeByIndex(j));
+ if (expectedTime < 200) {
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(j));
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(j));
+ } else if (expectedTime < 260
+ || (expectedTime >= 300 && expectedTime < 380)
+ || expectedTime >= 400) {
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(j));
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(j));
+ } else {
+ assertEquals(expectedTime, tsBlock.getColumn(0).getInt(j));
+ assertEquals(expectedTime, tsBlock.getColumn(1).getInt(j));
+ }
}
}
- count++;
}
- assertEquals(13, count);
- } catch (IOException | IllegalPathException e) {
+ } catch (IllegalPathException | QueryProcessException e) {
e.printStackTrace();
fail();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index e901ab6..f1e71d4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
@@ -49,6 +50,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -83,9 +85,11 @@ public class LimitOperatorTest {
allSensors.add("sensor0");
allSensors.add("sensor1");
QueryId queryId = new QueryId("stub_query");
+ AtomicReference<FragmentInstanceState> state =
+ new AtomicReference<>(FragmentInstanceState.RUNNING);
FragmentInstanceContext fragmentInstanceContext =
new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
fragmentInstanceContext.addOperatorContext(
1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
fragmentInstanceContext.addOperatorContext(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index 1fd496e..db5b568 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@ -45,6 +46,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -77,9 +79,11 @@ public class SeriesScanOperatorTest {
Set<String> allSensors = new HashSet<>();
allSensors.add("sensor0");
QueryId queryId = new QueryId("stub_query");
+ AtomicReference<FragmentInstanceState> state =
+ new AtomicReference<>(FragmentInstanceState.RUNNING);
FragmentInstanceContext fragmentInstanceContext =
new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
fragmentInstanceContext.addOperatorContext(
1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
SeriesScanOperator seriesScanOperator =
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index a22554f..2e49975 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -48,6 +49,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
@@ -79,9 +81,11 @@ public class TimeJoinOperatorTest {
allSensors.add("sensor0");
allSensors.add("sensor1");
QueryId queryId = new QueryId("stub_query");
+ AtomicReference<FragmentInstanceState> state =
+ new AtomicReference<>(FragmentInstanceState.RUNNING);
FragmentInstanceContext fragmentInstanceContext =
new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"));
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
fragmentInstanceContext.addOperatorContext(
1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
fragmentInstanceContext.addOperatorContext(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
index c79d376..7ecf8d1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
@@ -36,7 +36,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-public class FragmentInstanceManagerTest {
+public class FragmentInstanceSchedulerTest {
private final FragmentInstanceScheduler manager = FragmentInstanceScheduler.getInstance();