You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/10/20 07:19:33 UTC

[GitHub] [iotdb] yschengzi commented on a diff in pull request #7664: [IOTDB-4293] BufferedPipeDataQueue supports discontinuous serialNumber

yschengzi commented on code in PR #7664:
URL: https://github.com/apache/iotdb/pull/7664#discussion_r1000234225


##########
server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java:
##########
@@ -540,4 +543,92 @@ public void testOfferWhileTaking() {
       Assert.fail();
     }
   }
+
+  @Test
+  public void testOfferWhileTakingWithDiscontinuousSerialNumber() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      List<PipeData> pipeDataList = new ArrayList<>();
+      // pipelog1: 3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
+      PipeData tsFile3PipeData = new TsFilePipeData("fake3", 3);
+      pipeDataList.add(tsFile3PipeData);
+      tsFile3PipeData.serialize(pipeLogOutput1);
+      pipeLogOutput1.close();
+      // pipelog2: 4,5,6,7,10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
+      for (int i = 4; i < 8; i++) {
+        PipeData pipeData =
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      PipeData schema10PipeData =
+          new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake10")), 10);
+      pipeDataList.add(schema10PipeData);
+      schema10PipeData.serialize(pipeLogOutput2);
+      pipeLogOutput2.close();
+      ;
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+      // take
+      List<PipeData> pipeDataTakeList = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            while (true) {
+              try {
+                PipeData pipeData = pipeDataQueue.take();
+                System.out.println(pipeData);

Review Comment:
   done.



##########
server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java:
##########
@@ -540,4 +543,92 @@ public void testOfferWhileTaking() {
       Assert.fail();
     }
   }
+
+  @Test
+  public void testOfferWhileTakingWithDiscontinuousSerialNumber() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      List<PipeData> pipeDataList = new ArrayList<>();
+      // pipelog1: 3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
+      PipeData tsFile3PipeData = new TsFilePipeData("fake3", 3);
+      pipeDataList.add(tsFile3PipeData);
+      tsFile3PipeData.serialize(pipeLogOutput1);
+      pipeLogOutput1.close();
+      // pipelog2: 4,5,6,7,10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
+      for (int i = 4; i < 8; i++) {
+        PipeData pipeData =
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      PipeData schema10PipeData =
+          new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake10")), 10);
+      pipeDataList.add(schema10PipeData);
+      schema10PipeData.serialize(pipeLogOutput2);
+      pipeLogOutput2.close();
+      ;
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+      // take
+      List<PipeData> pipeDataTakeList = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            while (true) {
+              try {
+                PipeData pipeData = pipeDataQueue.take();
+                System.out.println(pipeData);
+                pipeDataTakeList.add(pipeData);
+                pipeDataQueue.commit();
+              } catch (InterruptedException e) {
+                break;
+              } catch (Exception e) {
+                e.printStackTrace();
+                break;
+              }
+            }
+          });
+      // offer
+      for (int i = 16; i < 20; i++) {
+        pipeDataQueue.offer(
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i));
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      es1.shutdownNow();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+      System.out.println(pipeDataTakeList);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org