You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/10/20 04:15:51 UTC

[GitHub] [iotdb] Cpaulyz commented on a diff in pull request #7664: [IOTDB-4293] BufferedPipeDataQueue supports discontinuous serialNumber

Cpaulyz commented on code in PR #7664:
URL: https://github.com/apache/iotdb/pull/7664#discussion_r1000113026


##########
server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java:
##########
@@ -540,4 +543,92 @@ public void testOfferWhileTaking() {
       Assert.fail();
     }
   }
+
+  @Test
+  public void testOfferWhileTakingWithDiscontinuousSerialNumber() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      List<PipeData> pipeDataList = new ArrayList<>();
+      // pipelog1: 3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
+      PipeData tsFile3PipeData = new TsFilePipeData("fake3", 3);
+      pipeDataList.add(tsFile3PipeData);
+      tsFile3PipeData.serialize(pipeLogOutput1);
+      pipeLogOutput1.close();
+      // pipelog2: 4,5,6,7,10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
+      for (int i = 4; i < 8; i++) {
+        PipeData pipeData =
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      PipeData schema10PipeData =
+          new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake10")), 10);
+      pipeDataList.add(schema10PipeData);
+      schema10PipeData.serialize(pipeLogOutput2);
+      pipeLogOutput2.close();
+      ;
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+      // take
+      List<PipeData> pipeDataTakeList = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            while (true) {
+              try {
+                PipeData pipeData = pipeDataQueue.take();
+                System.out.println(pipeData);
+                pipeDataTakeList.add(pipeData);
+                pipeDataQueue.commit();
+              } catch (InterruptedException e) {
+                break;
+              } catch (Exception e) {
+                e.printStackTrace();
+                break;
+              }
+            }
+          });
+      // offer
+      for (int i = 16; i < 20; i++) {
+        pipeDataQueue.offer(
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i));
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      es1.shutdownNow();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+      System.out.println(pipeDataTakeList);

Review Comment:
   remove or use logger



##########
server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java:
##########
@@ -540,4 +543,92 @@ public void testOfferWhileTaking() {
       Assert.fail();
     }
   }
+
+  @Test
+  public void testOfferWhileTakingWithDiscontinuousSerialNumber() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      List<PipeData> pipeDataList = new ArrayList<>();
+      // pipelog1: 3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
+      PipeData tsFile3PipeData = new TsFilePipeData("fake3", 3);
+      pipeDataList.add(tsFile3PipeData);
+      tsFile3PipeData.serialize(pipeLogOutput1);
+      pipeLogOutput1.close();
+      // pipelog2: 4,5,6,7,10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
+      for (int i = 4; i < 8; i++) {
+        PipeData pipeData =
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      PipeData schema10PipeData =
+          new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake10")), 10);
+      pipeDataList.add(schema10PipeData);
+      schema10PipeData.serialize(pipeLogOutput2);
+      pipeLogOutput2.close();
+      ;
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+      // take
+      List<PipeData> pipeDataTakeList = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            while (true) {
+              try {
+                PipeData pipeData = pipeDataQueue.take();
+                System.out.println(pipeData);

Review Comment:
   remove or use logger



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org