You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/20 11:18:02 UTC

[iotdb] branch stable-mpp updated: Fix UT

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/stable-mpp by this push:
     new c9cd225c22 Fix UT
c9cd225c22 is described below

commit c9cd225c2201a45d532fde1ee5060215a1683000
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Apr 20 19:17:53 2022 +0800

    Fix UT
---
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 10 ++--
 .../db/mpp/operator/process/LimitOperator.java     |  4 +-
 .../apache/iotdb/db/mpp/buffer/SinkHandleTest.java | 56 ++++------------------
 .../operator/SeriesAggregateScanOperatorTest.java  | 14 +++++-
 4 files changed, 27 insertions(+), 57 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index c6e3e64739..60a1583550 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -194,16 +194,16 @@ public class SinkHandle implements ISinkHandle {
     if (closed) {
       return;
     }
-    synchronized (this) {
-      closed = true;
-      noMoreTsBlocks = true;
-    }
-    sinkHandleListener.onClosed(this);
     try {
       sendEndOfDataBlockEvent();
     } catch (TException e) {
       throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
     }
+    synchronized (this) {
+      closed = true;
+      noMoreTsBlocks = true;
+    }
+    sinkHandleListener.onClosed(this);
     logger.info("Sink handle {} is closed.", this);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index a226637146..24711217c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.io.IOException;
-
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
@@ -76,7 +74,7 @@ public class LimitOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() throws IOException {
+  public boolean isFinished() {
     return remainingLimit == 0 || child.isFinished();
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index b47b3c57c8..b04e9b3848 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -34,7 +34,6 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executors;
 
@@ -91,12 +90,7 @@ public class SinkHandleTest {
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    try {
-      sinkHandle.send(mockTsBlocks);
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
+    sinkHandle.send(mockTsBlocks);
     sinkHandle.setNoMoreTsBlocks();
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
@@ -145,12 +139,7 @@ public class SinkHandleTest {
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(sinkHandle);
 
     // Close the SinkHandle.
-    try {
-      sinkHandle.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
+    sinkHandle.close();
     Assert.assertTrue(sinkHandle.isClosed());
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onClosed(sinkHandle);
     try {
@@ -222,12 +211,7 @@ public class SinkHandleTest {
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    try {
-      sinkHandle.send(mockTsBlocks);
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
+    sinkHandle.send(mockTsBlocks);
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isClosed());
@@ -274,12 +258,7 @@ public class SinkHandleTest {
         .free(queryId, numOfMockTsBlock * mockTsBlockSize);
 
     // Send tsblocks.
-    try {
-      sinkHandle.send(mockTsBlocks);
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
+    sinkHandle.send(mockTsBlocks);
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isClosed());
@@ -307,12 +286,7 @@ public class SinkHandleTest {
     // Close the SinkHandle.
     sinkHandle.setNoMoreTsBlocks();
     Assert.assertFalse(sinkHandle.isFinished());
-    try {
-      sinkHandle.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
+    sinkHandle.close();
     Assert.assertTrue(sinkHandle.isClosed());
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onClosed(sinkHandle);
     try {
@@ -403,12 +377,7 @@ public class SinkHandleTest {
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    try {
-      sinkHandle.send(mockTsBlocks);
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail();
-    }
+    sinkHandle.send(mockTsBlocks);
     sinkHandle.setNoMoreTsBlocks();
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
@@ -434,19 +403,12 @@ public class SinkHandleTest {
       Assert.fail();
     }
 
-    try {
-      sinkHandle.send(Collections.singletonList(Mockito.mock(TsBlock.class)));
-      Assert.fail("Expect an IOException.");
-    } catch (IOException e) {
-      Assert.assertEquals("org.apache.thrift.TException: Mock exception", e.getMessage());
-    }
-
     // Close the SinkHandle.
     try {
       sinkHandle.close();
-      Assert.fail("Expect an IOException.");
-    } catch (IOException e) {
-      Assert.assertEquals("org.apache.thrift.TException: Mock exception", e.getMessage());
+      Assert.fail("Expect an RuntimeException.");
+    } catch (RuntimeException e) {
+      Assert.assertEquals("Send EndOfDataBlockEvent failed", e.getMessage());
     }
     Assert.assertFalse(sinkHandle.isClosed());
     Mockito.verify(mockSinkHandleListener, Mockito.times(0)).onClosed(sinkHandle);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index d1c71f2eb4..d2c527d54c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -52,6 +54,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
@@ -64,16 +67,20 @@ public class SeriesAggregateScanOperatorTest {
 
   private final List<TsFileResource> seqResources = new ArrayList<>();
   private final List<TsFileResource> unSeqResources = new ArrayList<>();
+  private ExecutorService instanceNotificationExecutor;
 
   @Before
   public void setUp() throws MetadataException, IOException, WriteProcessException {
     SeriesReaderTestUtil.setUp(
         measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG);
+    this.instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
   }
 
   @After
   public void tearDown() throws IOException {
     SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+    instanceNotificationExecutor.shutdown();
   }
 
   @Test
@@ -349,9 +356,12 @@ public class SeriesAggregateScanOperatorTest {
     QueryId queryId = new QueryId("stub_query");
     AtomicReference<FragmentInstanceState> state =
         new AtomicReference<>(FragmentInstanceState.RUNNING);
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
     FragmentInstanceContext fragmentInstanceContext =
-        new FragmentInstanceContext(
-            new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+        new FragmentInstanceContext(instanceId, stateMachine);
     PlanNodeId planNodeId = new PlanNodeId("1");
     fragmentInstanceContext.addOperatorContext(
         1, planNodeId, SeriesScanOperator.class.getSimpleName());