You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/18 10:04:25 UTC
[iotdb] branch master updated: Fix some bug about clear environment after testing (#8046)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 36d98436be Fix some bug about clear environment after testing (#8046)
36d98436be is described below
commit 36d98436be0114cd03df9940f00505db9e1c9723
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Fri Nov 18 18:04:19 2022 +0800
Fix some bug about clear environment after testing (#8046)
---
.../apache/iotdb/db/engine/StorageEngineV2.java | 4 +-
.../sync/pipedata/BufferedPipeDataQueueTest.java | 475 +++++++++++----------
.../apache/iotdb/db/utils/EnvironmentUtils.java | 5 -
3 files changed, 251 insertions(+), 233 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 19cd375a23..1889a1e326 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -394,7 +394,9 @@ public class StorageEngineV2 implements IService {
seqMemtableTimedFlushCheckThread, ThreadName.TIMED_FlUSH_SEQ_MEMTABLE);
ThreadUtils.stopThreadPool(
unseqMemtableTimedFlushCheckThread, ThreadName.TIMED_FlUSH_UNSEQ_MEMTABLE);
- cachedThreadPool.shutdownNow();
+ if (cachedThreadPool != null) {
+ cachedThreadPool.shutdownNow();
+ }
dataRegionMap.clear();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
index 94c679dbab..89dfa25bc5 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
@@ -43,7 +43,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BufferedPipeDataQueueTest {
- // test4
private static final Logger logger = LoggerFactory.getLogger(BufferedPipeDataQueueTest.class);
File pipeLogDir =
@@ -134,64 +133,70 @@ public class BufferedPipeDataQueueTest {
@Test
public void testTakeAndOffer() {
BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
- List<PipeData> pipeDatas = new ArrayList<>();
- ExecutorService es1 = Executors.newSingleThreadExecutor();
- es1.execute(
- () -> {
- try {
- pipeDatas.add(pipeDataQueue.take());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- });
- pipeDataQueue.offer(new TsFilePipeData("", 0));
try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- es1.shutdownNow();
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- Assert.fail();
+ List<PipeData> pipeDatas = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ try {
+ pipeDatas.add(pipeDataQueue.take());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ pipeDataQueue.offer(new TsFilePipeData("", 0));
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertEquals(1, pipeDatas.size());
+ } finally {
+ pipeDataQueue.clear();
}
- Assert.assertEquals(1, pipeDatas.size());
- pipeDataQueue.clear();
}
/** Try to offer data to a new pipe. */
@Test
public void testOfferNewPipe() {
BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
- PipeData pipeData = new TsFilePipeData("fakePath", 1);
- pipeDataQueue.offer(pipeData);
- List<PipeData> pipeDatas = new ArrayList<>();
- ExecutorService es1 = Executors.newSingleThreadExecutor();
- es1.execute(
- () -> {
- try {
- pipeDatas.add(pipeDataQueue.take());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- });
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- es1.shutdownNow();
try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- Assert.fail();
+ PipeData pipeData = new TsFilePipeData("fakePath", 1);
+ pipeDataQueue.offer(pipeData);
+ List<PipeData> pipeDatas = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ try {
+ pipeDatas.add(pipeDataQueue.take());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertEquals(1, pipeDatas.size());
+ Assert.assertEquals(pipeData, pipeDatas.get(0));
+ } finally {
+ pipeDataQueue.clear();
}
- Assert.assertEquals(1, pipeDatas.size());
- Assert.assertEquals(pipeData, pipeDatas.get(0));
- pipeDataQueue.clear();
}
/**
@@ -244,43 +249,47 @@ public class BufferedPipeDataQueueTest {
pipeLogOutput3.close();
// recovery
BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
- Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
- Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
- PipeData offerPipeData = new TsFilePipeData("fake11", 11);
- pipeDataList.add(offerPipeData);
- pipeDataQueue.offer(offerPipeData);
+ try {
- // take and check
- List<PipeData> pipeDataTakeList = new ArrayList<>();
- ExecutorService es1 = Executors.newSingleThreadExecutor();
- es1.execute(
- () -> {
- while (true) {
- try {
- pipeDataTakeList.add(pipeDataQueue.take());
- pipeDataQueue.commit();
- } catch (InterruptedException e) {
- break;
+ Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+ Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+ PipeData offerPipeData = new TsFilePipeData("fake11", 11);
+ pipeDataList.add(offerPipeData);
+ pipeDataQueue.offer(offerPipeData);
+
+ // take and check
+ List<PipeData> pipeDataTakeList = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ while (true) {
+ try {
+ pipeDataTakeList.add(pipeDataQueue.take());
+ pipeDataQueue.commit();
+ } catch (InterruptedException e) {
+ break;
+ }
}
- }
- });
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- es1.shutdownNow();
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- Assert.fail();
- }
- Assert.assertEquals(10, pipeDataTakeList.size());
- for (int i = 0; i < 10; i++) {
- Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+ });
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertEquals(10, pipeDataTakeList.size());
+ for (int i = 0; i < 10; i++) {
+ Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+ }
+ } finally {
+ pipeDataQueue.clear();
}
- pipeDataQueue.clear();
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
@@ -334,40 +343,43 @@ public class BufferedPipeDataQueueTest {
pipeLogOutput3.close();
// recovery
BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
- Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
- Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+ try {
+ Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+ Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
- // take and check
- List<PipeData> pipeDataTakeList = new ArrayList<>();
- ExecutorService es1 = Executors.newSingleThreadExecutor();
- es1.execute(
- () -> {
- while (true) {
- try {
- pipeDataTakeList.add(pipeDataQueue.take());
- pipeDataQueue.commit();
- } catch (InterruptedException e) {
- break;
+ // take and check
+ List<PipeData> pipeDataTakeList = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ while (true) {
+ try {
+ pipeDataTakeList.add(pipeDataQueue.take());
+ pipeDataQueue.commit();
+ } catch (InterruptedException e) {
+ break;
+ }
}
- }
- });
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- es1.shutdownNow();
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- Assert.fail();
- }
- Assert.assertEquals(9, pipeDataTakeList.size());
- for (int i = 0; i < 9; i++) {
- Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+ });
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertEquals(9, pipeDataTakeList.size());
+ for (int i = 0; i < 9; i++) {
+ Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+ }
+ } finally {
+ pipeDataQueue.clear();
}
- pipeDataQueue.clear();
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
@@ -416,40 +428,43 @@ public class BufferedPipeDataQueueTest {
;
// recovery
BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
- Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
- Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+ try {
+ Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+ Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
- // take and check
- List<PipeData> pipeDataTakeList = new ArrayList<>();
- ExecutorService es1 = Executors.newSingleThreadExecutor();
- es1.execute(
- () -> {
- while (true) {
- try {
- pipeDataTakeList.add(pipeDataQueue.take());
- pipeDataQueue.commit();
- } catch (InterruptedException e) {
- break;
+ // take and check
+ List<PipeData> pipeDataTakeList = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ while (true) {
+ try {
+ pipeDataTakeList.add(pipeDataQueue.take());
+ pipeDataQueue.commit();
+ } catch (InterruptedException e) {
+ break;
+ }
}
- }
- });
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- es1.shutdownNow();
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- Assert.fail();
- }
- Assert.assertEquals(9, pipeDataTakeList.size());
- for (int i = 0; i < 9; i++) {
- Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+ });
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertEquals(9, pipeDataTakeList.size());
+ for (int i = 0; i < 9; i++) {
+ Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+ }
+ } finally {
+ pipeDataQueue.clear();
}
- pipeDataQueue.clear();
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
@@ -497,48 +512,51 @@ public class BufferedPipeDataQueueTest {
;
// recovery
BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
- Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
- Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+ try {
+ Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+ Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
- // take
- List<PipeData> pipeDataTakeList = new ArrayList<>();
- ExecutorService es1 = Executors.newSingleThreadExecutor();
- es1.execute(
- () -> {
- while (true) {
- try {
- pipeDataTakeList.add(pipeDataQueue.take());
- pipeDataQueue.commit();
- } catch (InterruptedException e) {
- break;
- } catch (Exception e) {
- e.printStackTrace();
- break;
+ // take
+ List<PipeData> pipeDataTakeList = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ while (true) {
+ try {
+ pipeDataTakeList.add(pipeDataQueue.take());
+ pipeDataQueue.commit();
+ } catch (InterruptedException e) {
+ break;
+ } catch (Exception e) {
+ e.printStackTrace();
+ break;
+ }
}
- }
- });
- // offer
- for (int i = 11; i < 20; i++) {
- pipeDataQueue.offer(
- new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i));
- }
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- es1.shutdownNow();
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- Assert.fail();
- }
- Assert.assertEquals(18, pipeDataTakeList.size());
- for (int i = 0; i < 9; i++) {
- Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+ });
+ // offer
+ for (int i = 11; i < 20; i++) {
+ pipeDataQueue.offer(
+ new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i));
+ }
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertEquals(18, pipeDataTakeList.size());
+ for (int i = 0; i < 9; i++) {
+ Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+ }
+ } finally {
+ pipeDataQueue.clear();
}
- pipeDataQueue.clear();
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
@@ -582,53 +600,56 @@ public class BufferedPipeDataQueueTest {
;
// recovery
BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
- Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
- Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+ try {
+ Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+ Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
- // take
- List<PipeData> pipeDataTakeList = new ArrayList<>();
- ExecutorService es1 = Executors.newSingleThreadExecutor();
- es1.execute(
- () -> {
- while (true) {
- try {
- PipeData pipeData = pipeDataQueue.take();
- logger.info(String.format("PipeData: %s", pipeData));
- pipeDataTakeList.add(pipeData);
- pipeDataQueue.commit();
- } catch (InterruptedException e) {
- break;
- } catch (Exception e) {
- e.printStackTrace();
- break;
+ // take
+ List<PipeData> pipeDataTakeList = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ while (true) {
+ try {
+ PipeData pipeData = pipeDataQueue.take();
+ logger.info(String.format("PipeData: %s", pipeData));
+ pipeDataTakeList.add(pipeData);
+ pipeDataQueue.commit();
+ } catch (InterruptedException e) {
+ break;
+ } catch (Exception e) {
+ e.printStackTrace();
+ break;
+ }
}
- }
- });
- // offer
- for (int i = 16; i < 20; i++) {
- if (!pipeDataQueue.offer(
- new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i))) {
- logger.info(String.format("Can not offer serialize number %d", i));
+ });
+ // offer
+ for (int i = 16; i < 20; i++) {
+ if (!pipeDataQueue.offer(
+ new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i))) {
+ logger.info(String.format("Can not offer serialize number %d", i));
+ }
}
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ logger.info(String.format("PipeDataTakeList: %s", pipeDataTakeList));
+ Assert.assertEquals(10, pipeDataTakeList.size());
+ for (int i = 0; i < 6; i++) {
+ Assert.assertEquals(pipeDataList.get(i), pipeDataTakeList.get(i));
+ }
+ } finally {
+ pipeDataQueue.clear();
}
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- es1.shutdownNow();
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- Assert.fail();
- }
- logger.info(String.format("PipeDataTakeList: %s", pipeDataTakeList));
- Assert.assertEquals(10, pipeDataTakeList.size());
- for (int i = 0; i < 6; i++) {
- Assert.assertEquals(pipeDataList.get(i), pipeDataTakeList.get(i));
- }
- pipeDataQueue.clear();
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index e74aad229f..9dea24d716 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -148,11 +148,6 @@ public class EnvironmentUtils {
WALRecoverManager.getInstance().clear();
StorageEngineV2.getInstance().stop();
- // clean database manager
- // if (!StorageEngine.getInstance().deleteAll()) {
- // logger.error("Can't close the database manager in EnvironmentUtils");
- // fail();
- // }
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Running);
// We must disable MQTT service as it will cost a lot of time to be shutdown, which may slow our