You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/18 10:04:25 UTC

[iotdb] branch master updated: Fix some bug about clear environment after testing (#8046)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 36d98436be Fix some bug about clear environment after testing (#8046)
36d98436be is described below

commit 36d98436be0114cd03df9940f00505db9e1c9723
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Fri Nov 18 18:04:19 2022 +0800

    Fix some bug about clear environment after testing (#8046)
---
 .../apache/iotdb/db/engine/StorageEngineV2.java    |   4 +-
 .../sync/pipedata/BufferedPipeDataQueueTest.java   | 475 +++++++++++----------
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   5 -
 3 files changed, 251 insertions(+), 233 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 19cd375a23..1889a1e326 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -394,7 +394,9 @@ public class StorageEngineV2 implements IService {
         seqMemtableTimedFlushCheckThread, ThreadName.TIMED_FlUSH_SEQ_MEMTABLE);
     ThreadUtils.stopThreadPool(
         unseqMemtableTimedFlushCheckThread, ThreadName.TIMED_FlUSH_UNSEQ_MEMTABLE);
-    cachedThreadPool.shutdownNow();
+    if (cachedThreadPool != null) {
+      cachedThreadPool.shutdownNow();
+    }
     dataRegionMap.clear();
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
index 94c679dbab..89dfa25bc5 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
@@ -43,7 +43,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 public class BufferedPipeDataQueueTest {
-  // test4
   private static final Logger logger = LoggerFactory.getLogger(BufferedPipeDataQueueTest.class);
 
   File pipeLogDir =
@@ -134,64 +133,70 @@ public class BufferedPipeDataQueueTest {
   @Test
   public void testTakeAndOffer() {
     BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
-    List<PipeData> pipeDatas = new ArrayList<>();
-    ExecutorService es1 = Executors.newSingleThreadExecutor();
-    es1.execute(
-        () -> {
-          try {
-            pipeDatas.add(pipeDataQueue.take());
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-          }
-        });
-    pipeDataQueue.offer(new TsFilePipeData("", 0));
     try {
-      Thread.sleep(3000);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-    es1.shutdownNow();
-    try {
-      Thread.sleep(500);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-      Assert.fail();
+      List<PipeData> pipeDatas = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            try {
+              pipeDatas.add(pipeDataQueue.take());
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+          });
+      pipeDataQueue.offer(new TsFilePipeData("", 0));
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      es1.shutdownNow();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+      Assert.assertEquals(1, pipeDatas.size());
+    } finally {
+      pipeDataQueue.clear();
     }
-    Assert.assertEquals(1, pipeDatas.size());
-    pipeDataQueue.clear();
   }
 
   /** Try to offer data to a new pipe. */
   @Test
   public void testOfferNewPipe() {
     BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
-    PipeData pipeData = new TsFilePipeData("fakePath", 1);
-    pipeDataQueue.offer(pipeData);
-    List<PipeData> pipeDatas = new ArrayList<>();
-    ExecutorService es1 = Executors.newSingleThreadExecutor();
-    es1.execute(
-        () -> {
-          try {
-            pipeDatas.add(pipeDataQueue.take());
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-          }
-        });
-    try {
-      Thread.sleep(3000);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-    es1.shutdownNow();
     try {
-      Thread.sleep(500);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-      Assert.fail();
+      PipeData pipeData = new TsFilePipeData("fakePath", 1);
+      pipeDataQueue.offer(pipeData);
+      List<PipeData> pipeDatas = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            try {
+              pipeDatas.add(pipeDataQueue.take());
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+          });
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      es1.shutdownNow();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+      Assert.assertEquals(1, pipeDatas.size());
+      Assert.assertEquals(pipeData, pipeDatas.get(0));
+    } finally {
+      pipeDataQueue.clear();
     }
-    Assert.assertEquals(1, pipeDatas.size());
-    Assert.assertEquals(pipeData, pipeDatas.get(0));
-    pipeDataQueue.clear();
   }
 
   /**
@@ -244,43 +249,47 @@ public class BufferedPipeDataQueueTest {
       pipeLogOutput3.close();
       // recovery
       BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
-      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
-      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
-      PipeData offerPipeData = new TsFilePipeData("fake11", 11);
-      pipeDataList.add(offerPipeData);
-      pipeDataQueue.offer(offerPipeData);
+      try {
 
-      // take and check
-      List<PipeData> pipeDataTakeList = new ArrayList<>();
-      ExecutorService es1 = Executors.newSingleThreadExecutor();
-      es1.execute(
-          () -> {
-            while (true) {
-              try {
-                pipeDataTakeList.add(pipeDataQueue.take());
-                pipeDataQueue.commit();
-              } catch (InterruptedException e) {
-                break;
+        Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+        Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+        PipeData offerPipeData = new TsFilePipeData("fake11", 11);
+        pipeDataList.add(offerPipeData);
+        pipeDataQueue.offer(offerPipeData);
+
+        // take and check
+        List<PipeData> pipeDataTakeList = new ArrayList<>();
+        ExecutorService es1 = Executors.newSingleThreadExecutor();
+        es1.execute(
+            () -> {
+              while (true) {
+                try {
+                  pipeDataTakeList.add(pipeDataQueue.take());
+                  pipeDataQueue.commit();
+                } catch (InterruptedException e) {
+                  break;
+                }
               }
-            }
-          });
-      try {
-        Thread.sleep(3000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-      es1.shutdownNow();
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-        Assert.fail();
-      }
-      Assert.assertEquals(10, pipeDataTakeList.size());
-      for (int i = 0; i < 10; i++) {
-        Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+            });
+        try {
+          Thread.sleep(3000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        es1.shutdownNow();
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          Assert.fail();
+        }
+        Assert.assertEquals(10, pipeDataTakeList.size());
+        for (int i = 0; i < 10; i++) {
+          Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+        }
+      } finally {
+        pipeDataQueue.clear();
       }
-      pipeDataQueue.clear();
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail();
@@ -334,40 +343,43 @@ public class BufferedPipeDataQueueTest {
       pipeLogOutput3.close();
       // recovery
       BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
-      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
-      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+      try {
+        Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+        Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
 
-      // take and check
-      List<PipeData> pipeDataTakeList = new ArrayList<>();
-      ExecutorService es1 = Executors.newSingleThreadExecutor();
-      es1.execute(
-          () -> {
-            while (true) {
-              try {
-                pipeDataTakeList.add(pipeDataQueue.take());
-                pipeDataQueue.commit();
-              } catch (InterruptedException e) {
-                break;
+        // take and check
+        List<PipeData> pipeDataTakeList = new ArrayList<>();
+        ExecutorService es1 = Executors.newSingleThreadExecutor();
+        es1.execute(
+            () -> {
+              while (true) {
+                try {
+                  pipeDataTakeList.add(pipeDataQueue.take());
+                  pipeDataQueue.commit();
+                } catch (InterruptedException e) {
+                  break;
+                }
               }
-            }
-          });
-      try {
-        Thread.sleep(3000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-      es1.shutdownNow();
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-        Assert.fail();
-      }
-      Assert.assertEquals(9, pipeDataTakeList.size());
-      for (int i = 0; i < 9; i++) {
-        Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+            });
+        try {
+          Thread.sleep(3000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        es1.shutdownNow();
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          Assert.fail();
+        }
+        Assert.assertEquals(9, pipeDataTakeList.size());
+        for (int i = 0; i < 9; i++) {
+          Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+        }
+      } finally {
+        pipeDataQueue.clear();
       }
-      pipeDataQueue.clear();
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail();
@@ -416,40 +428,43 @@ public class BufferedPipeDataQueueTest {
       ;
       // recovery
       BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
-      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
-      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+      try {
+        Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+        Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
 
-      // take and check
-      List<PipeData> pipeDataTakeList = new ArrayList<>();
-      ExecutorService es1 = Executors.newSingleThreadExecutor();
-      es1.execute(
-          () -> {
-            while (true) {
-              try {
-                pipeDataTakeList.add(pipeDataQueue.take());
-                pipeDataQueue.commit();
-              } catch (InterruptedException e) {
-                break;
+        // take and check
+        List<PipeData> pipeDataTakeList = new ArrayList<>();
+        ExecutorService es1 = Executors.newSingleThreadExecutor();
+        es1.execute(
+            () -> {
+              while (true) {
+                try {
+                  pipeDataTakeList.add(pipeDataQueue.take());
+                  pipeDataQueue.commit();
+                } catch (InterruptedException e) {
+                  break;
+                }
               }
-            }
-          });
-      try {
-        Thread.sleep(3000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-      es1.shutdownNow();
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-        Assert.fail();
-      }
-      Assert.assertEquals(9, pipeDataTakeList.size());
-      for (int i = 0; i < 9; i++) {
-        Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+            });
+        try {
+          Thread.sleep(3000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        es1.shutdownNow();
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          Assert.fail();
+        }
+        Assert.assertEquals(9, pipeDataTakeList.size());
+        for (int i = 0; i < 9; i++) {
+          Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+        }
+      } finally {
+        pipeDataQueue.clear();
       }
-      pipeDataQueue.clear();
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail();
@@ -497,48 +512,51 @@ public class BufferedPipeDataQueueTest {
       ;
       // recovery
       BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
-      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
-      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+      try {
+        Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+        Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
 
-      // take
-      List<PipeData> pipeDataTakeList = new ArrayList<>();
-      ExecutorService es1 = Executors.newSingleThreadExecutor();
-      es1.execute(
-          () -> {
-            while (true) {
-              try {
-                pipeDataTakeList.add(pipeDataQueue.take());
-                pipeDataQueue.commit();
-              } catch (InterruptedException e) {
-                break;
-              } catch (Exception e) {
-                e.printStackTrace();
-                break;
+        // take
+        List<PipeData> pipeDataTakeList = new ArrayList<>();
+        ExecutorService es1 = Executors.newSingleThreadExecutor();
+        es1.execute(
+            () -> {
+              while (true) {
+                try {
+                  pipeDataTakeList.add(pipeDataQueue.take());
+                  pipeDataQueue.commit();
+                } catch (InterruptedException e) {
+                  break;
+                } catch (Exception e) {
+                  e.printStackTrace();
+                  break;
+                }
               }
-            }
-          });
-      // offer
-      for (int i = 11; i < 20; i++) {
-        pipeDataQueue.offer(
-            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i));
-      }
-      try {
-        Thread.sleep(3000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-      es1.shutdownNow();
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-        Assert.fail();
-      }
-      Assert.assertEquals(18, pipeDataTakeList.size());
-      for (int i = 0; i < 9; i++) {
-        Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+            });
+        // offer
+        for (int i = 11; i < 20; i++) {
+          pipeDataQueue.offer(
+              new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i));
+        }
+        try {
+          Thread.sleep(3000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        es1.shutdownNow();
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          Assert.fail();
+        }
+        Assert.assertEquals(18, pipeDataTakeList.size());
+        for (int i = 0; i < 9; i++) {
+          Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+        }
+      } finally {
+        pipeDataQueue.clear();
       }
-      pipeDataQueue.clear();
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail();
@@ -582,53 +600,56 @@ public class BufferedPipeDataQueueTest {
       ;
       // recovery
       BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
-      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
-      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+      try {
+        Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+        Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
 
-      // take
-      List<PipeData> pipeDataTakeList = new ArrayList<>();
-      ExecutorService es1 = Executors.newSingleThreadExecutor();
-      es1.execute(
-          () -> {
-            while (true) {
-              try {
-                PipeData pipeData = pipeDataQueue.take();
-                logger.info(String.format("PipeData: %s", pipeData));
-                pipeDataTakeList.add(pipeData);
-                pipeDataQueue.commit();
-              } catch (InterruptedException e) {
-                break;
-              } catch (Exception e) {
-                e.printStackTrace();
-                break;
+        // take
+        List<PipeData> pipeDataTakeList = new ArrayList<>();
+        ExecutorService es1 = Executors.newSingleThreadExecutor();
+        es1.execute(
+            () -> {
+              while (true) {
+                try {
+                  PipeData pipeData = pipeDataQueue.take();
+                  logger.info(String.format("PipeData: %s", pipeData));
+                  pipeDataTakeList.add(pipeData);
+                  pipeDataQueue.commit();
+                } catch (InterruptedException e) {
+                  break;
+                } catch (Exception e) {
+                  e.printStackTrace();
+                  break;
+                }
               }
-            }
-          });
-      // offer
-      for (int i = 16; i < 20; i++) {
-        if (!pipeDataQueue.offer(
-            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i))) {
-          logger.info(String.format("Can not offer serialize number %d", i));
+            });
+        // offer
+        for (int i = 16; i < 20; i++) {
+          if (!pipeDataQueue.offer(
+              new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i))) {
+            logger.info(String.format("Can not offer serialize number %d", i));
+          }
         }
+        try {
+          Thread.sleep(3000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        es1.shutdownNow();
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          Assert.fail();
+        }
+        logger.info(String.format("PipeDataTakeList: %s", pipeDataTakeList));
+        Assert.assertEquals(10, pipeDataTakeList.size());
+        for (int i = 0; i < 6; i++) {
+          Assert.assertEquals(pipeDataList.get(i), pipeDataTakeList.get(i));
+        }
+      } finally {
+        pipeDataQueue.clear();
       }
-      try {
-        Thread.sleep(3000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-      es1.shutdownNow();
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-        Assert.fail();
-      }
-      logger.info(String.format("PipeDataTakeList: %s", pipeDataTakeList));
-      Assert.assertEquals(10, pipeDataTakeList.size());
-      for (int i = 0; i < 6; i++) {
-        Assert.assertEquals(pipeDataList.get(i), pipeDataTakeList.get(i));
-      }
-      pipeDataQueue.clear();
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail();
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index e74aad229f..9dea24d716 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -148,11 +148,6 @@ public class EnvironmentUtils {
     WALRecoverManager.getInstance().clear();
 
     StorageEngineV2.getInstance().stop();
-    // clean database manager
-    //    if (!StorageEngine.getInstance().deleteAll()) {
-    //      logger.error("Can't close the database manager in EnvironmentUtils");
-    //      fail();
-    //    }
 
     CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.Running);
     // We must disable MQTT service as it will cost a lot of time to be shutdown, which may slow our