You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/20 11:18:02 UTC
[iotdb] branch stable-mpp updated: Fix UT
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/stable-mpp by this push:
new c9cd225c22 Fix UT
c9cd225c22 is described below
commit c9cd225c2201a45d532fde1ee5060215a1683000
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Apr 20 19:17:53 2022 +0800
Fix UT
---
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 10 ++--
.../db/mpp/operator/process/LimitOperator.java | 4 +-
.../apache/iotdb/db/mpp/buffer/SinkHandleTest.java | 56 ++++------------------
.../operator/SeriesAggregateScanOperatorTest.java | 14 +++++-
4 files changed, 27 insertions(+), 57 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index c6e3e64739..60a1583550 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -194,16 +194,16 @@ public class SinkHandle implements ISinkHandle {
if (closed) {
return;
}
- synchronized (this) {
- closed = true;
- noMoreTsBlocks = true;
- }
- sinkHandleListener.onClosed(this);
try {
sendEndOfDataBlockEvent();
} catch (TException e) {
throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
}
+ synchronized (this) {
+ closed = true;
+ noMoreTsBlocks = true;
+ }
+ sinkHandleListener.onClosed(this);
logger.info("Sink handle {} is closed.", this);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index a226637146..24711217c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
-import java.io.IOException;
-
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
@@ -76,7 +74,7 @@ public class LimitOperator implements ProcessOperator {
}
@Override
- public boolean isFinished() throws IOException {
+ public boolean isFinished() {
return remainingLimit == 0 || child.isFinished();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index b47b3c57c8..b04e9b3848 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -34,7 +34,6 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
@@ -91,12 +90,7 @@ public class SinkHandleTest {
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
- try {
- sinkHandle.send(mockTsBlocks);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sinkHandle.send(mockTsBlocks);
sinkHandle.setNoMoreTsBlocks();
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
@@ -145,12 +139,7 @@ public class SinkHandleTest {
Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(sinkHandle);
// Close the SinkHandle.
- try {
- sinkHandle.close();
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sinkHandle.close();
Assert.assertTrue(sinkHandle.isClosed());
Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onClosed(sinkHandle);
try {
@@ -222,12 +211,7 @@ public class SinkHandleTest {
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
- try {
- sinkHandle.send(mockTsBlocks);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sinkHandle.send(mockTsBlocks);
Assert.assertFalse(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isClosed());
@@ -274,12 +258,7 @@ public class SinkHandleTest {
.free(queryId, numOfMockTsBlock * mockTsBlockSize);
// Send tsblocks.
- try {
- sinkHandle.send(mockTsBlocks);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sinkHandle.send(mockTsBlocks);
Assert.assertFalse(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
Assert.assertFalse(sinkHandle.isClosed());
@@ -307,12 +286,7 @@ public class SinkHandleTest {
// Close the SinkHandle.
sinkHandle.setNoMoreTsBlocks();
Assert.assertFalse(sinkHandle.isFinished());
- try {
- sinkHandle.close();
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sinkHandle.close();
Assert.assertTrue(sinkHandle.isClosed());
Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onClosed(sinkHandle);
try {
@@ -403,12 +377,7 @@ public class SinkHandleTest {
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
- try {
- sinkHandle.send(mockTsBlocks);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ sinkHandle.send(mockTsBlocks);
sinkHandle.setNoMoreTsBlocks();
Assert.assertFalse(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
@@ -434,19 +403,12 @@ public class SinkHandleTest {
Assert.fail();
}
- try {
- sinkHandle.send(Collections.singletonList(Mockito.mock(TsBlock.class)));
- Assert.fail("Expect an IOException.");
- } catch (IOException e) {
- Assert.assertEquals("org.apache.thrift.TException: Mock exception", e.getMessage());
- }
-
// Close the SinkHandle.
try {
sinkHandle.close();
- Assert.fail("Expect an IOException.");
- } catch (IOException e) {
- Assert.assertEquals("org.apache.thrift.TException: Mock exception", e.getMessage());
+ Assert.fail("Expect an RuntimeException.");
+ } catch (RuntimeException e) {
+ Assert.assertEquals("Send EndOfDataBlockEvent failed", e.getMessage());
}
Assert.assertFalse(sinkHandle.isClosed());
Mockito.verify(mockSinkHandleListener, Mockito.times(0)).onClosed(sinkHandle);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index d1c71f2eb4..d2c527d54c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -52,6 +54,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
@@ -64,16 +67,20 @@ public class SeriesAggregateScanOperatorTest {
private final List<TsFileResource> seqResources = new ArrayList<>();
private final List<TsFileResource> unSeqResources = new ArrayList<>();
+ private ExecutorService instanceNotificationExecutor;
@Before
public void setUp() throws MetadataException, IOException, WriteProcessException {
SeriesReaderTestUtil.setUp(
measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG);
+ this.instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
}
@After
public void tearDown() throws IOException {
SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ instanceNotificationExecutor.shutdown();
}
@Test
@@ -349,9 +356,12 @@ public class SeriesAggregateScanOperatorTest {
QueryId queryId = new QueryId("stub_query");
AtomicReference<FragmentInstanceState> state =
new AtomicReference<>(FragmentInstanceState.RUNNING);
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
- new FragmentInstanceContext(
- new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+ new FragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId = new PlanNodeId("1");
fragmentInstanceContext.addOperatorContext(
1, planNodeId, SeriesScanOperator.class.getSimpleName());