You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/10/19 12:10:53 UTC

[GitHub] [iotdb] yschengzi opened a new pull request, #7664: [IOTDB-4293] BufferedPipeDataQueue supports discontinuous serialNumber

yschengzi opened a new pull request, #7664:
URL: https://github.com/apache/iotdb/pull/7664

   Add commit(long serialNumber) to interface PipeDataQueue
   When serialNum is not discontinuous (e.g. 1, 2, 5, 8, 10),  the correctness of the interface in PipeDataQueue should be ensured.
   Add UT for discontinuous serialNumber case


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] yschengzi commented on a diff in pull request #7664: [IOTDB-4293] BufferedPipeDataQueue supports discontinuous serialNumber

Posted by GitBox <gi...@apache.org>.
yschengzi commented on code in PR #7664:
URL: https://github.com/apache/iotdb/pull/7664#discussion_r1000234225


##########
server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java:
##########
@@ -540,4 +543,92 @@ public void testOfferWhileTaking() {
       Assert.fail();
     }
   }
+
+  @Test
+  public void testOfferWhileTakingWithDiscontinuousSerialNumber() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      List<PipeData> pipeDataList = new ArrayList<>();
+      // pipelog1: 3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
+      PipeData tsFile3PipeData = new TsFilePipeData("fake3", 3);
+      pipeDataList.add(tsFile3PipeData);
+      tsFile3PipeData.serialize(pipeLogOutput1);
+      pipeLogOutput1.close();
+      // pipelog2: 4,5,6,7,10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
+      for (int i = 4; i < 8; i++) {
+        PipeData pipeData =
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      PipeData schema10PipeData =
+          new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake10")), 10);
+      pipeDataList.add(schema10PipeData);
+      schema10PipeData.serialize(pipeLogOutput2);
+      pipeLogOutput2.close();
+      ;
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+      // take
+      List<PipeData> pipeDataTakeList = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            while (true) {
+              try {
+                PipeData pipeData = pipeDataQueue.take();
+                System.out.println(pipeData);

Review Comment:
   done.



##########
server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java:
##########
@@ -540,4 +543,92 @@ public void testOfferWhileTaking() {
       Assert.fail();
     }
   }
+
+  @Test
+  public void testOfferWhileTakingWithDiscontinuousSerialNumber() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      List<PipeData> pipeDataList = new ArrayList<>();
+      // pipelog1: 3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
+      PipeData tsFile3PipeData = new TsFilePipeData("fake3", 3);
+      pipeDataList.add(tsFile3PipeData);
+      tsFile3PipeData.serialize(pipeLogOutput1);
+      pipeLogOutput1.close();
+      // pipelog2: 4,5,6,7,10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
+      for (int i = 4; i < 8; i++) {
+        PipeData pipeData =
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      PipeData schema10PipeData =
+          new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake10")), 10);
+      pipeDataList.add(schema10PipeData);
+      schema10PipeData.serialize(pipeLogOutput2);
+      pipeLogOutput2.close();
+      ;
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+      // take
+      List<PipeData> pipeDataTakeList = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            while (true) {
+              try {
+                PipeData pipeData = pipeDataQueue.take();
+                System.out.println(pipeData);
+                pipeDataTakeList.add(pipeData);
+                pipeDataQueue.commit();
+              } catch (InterruptedException e) {
+                break;
+              } catch (Exception e) {
+                e.printStackTrace();
+                break;
+              }
+            }
+          });
+      // offer
+      for (int i = 16; i < 20; i++) {
+        pipeDataQueue.offer(
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i));
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      es1.shutdownNow();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+      System.out.println(pipeDataTakeList);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] HTHou merged pull request #7664: [IOTDB-4293] BufferedPipeDataQueue supports discontinuous serialNumber

Posted by GitBox <gi...@apache.org>.
HTHou merged PR #7664:
URL: https://github.com/apache/iotdb/pull/7664


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [iotdb] Cpaulyz commented on a diff in pull request #7664: [IOTDB-4293] BufferedPipeDataQueue supports discontinuous serialNumber

Posted by GitBox <gi...@apache.org>.
Cpaulyz commented on code in PR #7664:
URL: https://github.com/apache/iotdb/pull/7664#discussion_r1000113026


##########
server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java:
##########
@@ -540,4 +543,92 @@ public void testOfferWhileTaking() {
       Assert.fail();
     }
   }
+
+  @Test
+  public void testOfferWhileTakingWithDiscontinuousSerialNumber() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      List<PipeData> pipeDataList = new ArrayList<>();
+      // pipelog1: 3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
+      PipeData tsFile3PipeData = new TsFilePipeData("fake3", 3);
+      pipeDataList.add(tsFile3PipeData);
+      tsFile3PipeData.serialize(pipeLogOutput1);
+      pipeLogOutput1.close();
+      // pipelog2: 4,5,6,7,10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
+      for (int i = 4; i < 8; i++) {
+        PipeData pipeData =
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      PipeData schema10PipeData =
+          new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake10")), 10);
+      pipeDataList.add(schema10PipeData);
+      schema10PipeData.serialize(pipeLogOutput2);
+      pipeLogOutput2.close();
+      ;
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+      // take
+      List<PipeData> pipeDataTakeList = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            while (true) {
+              try {
+                PipeData pipeData = pipeDataQueue.take();
+                System.out.println(pipeData);
+                pipeDataTakeList.add(pipeData);
+                pipeDataQueue.commit();
+              } catch (InterruptedException e) {
+                break;
+              } catch (Exception e) {
+                e.printStackTrace();
+                break;
+              }
+            }
+          });
+      // offer
+      for (int i = 16; i < 20; i++) {
+        pipeDataQueue.offer(
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i));
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      es1.shutdownNow();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+      System.out.println(pipeDataTakeList);

Review Comment:
   remove or use logger



##########
server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java:
##########
@@ -540,4 +543,92 @@ public void testOfferWhileTaking() {
       Assert.fail();
     }
   }
+
+  @Test
+  public void testOfferWhileTakingWithDiscontinuousSerialNumber() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      List<PipeData> pipeDataList = new ArrayList<>();
+      // pipelog1: 3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
+      PipeData tsFile3PipeData = new TsFilePipeData("fake3", 3);
+      pipeDataList.add(tsFile3PipeData);
+      tsFile3PipeData.serialize(pipeLogOutput1);
+      pipeLogOutput1.close();
+      // pipelog2: 4,5,6,7,10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
+      for (int i = 4; i < 8; i++) {
+        PipeData pipeData =
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      PipeData schema10PipeData =
+          new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake10")), 10);
+      pipeDataList.add(schema10PipeData);
+      schema10PipeData.serialize(pipeLogOutput2);
+      pipeLogOutput2.close();
+      ;
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+      // take
+      List<PipeData> pipeDataTakeList = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            while (true) {
+              try {
+                PipeData pipeData = pipeDataQueue.take();
+                System.out.println(pipeData);

Review Comment:
   remove or use logger



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org