You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/24 04:03:33 UTC

[iotdb] branch master updated: [IOTDB-4293] BufferedPipeDataQueue supports discontinuous serialNumber (#7664)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6101e132d6 [IOTDB-4293] BufferedPipeDataQueue supports discontinuous serialNumber (#7664)
6101e132d6 is described below

commit 6101e132d6e017a27088cb456018ad06297a35cf
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Mon Oct 24 12:03:28 2022 +0800

    [IOTDB-4293] BufferedPipeDataQueue supports discontinuous serialNumber (#7664)
---
 .../sync/pipedata/queue/BufferedPipeDataQueue.java | 70 ++++++----------
 .../sync/pipedata/BufferedPipeDataQueueTest.java   | 95 ++++++++++++++++++++++
 2 files changed, 119 insertions(+), 46 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
index 8d5a23d4b1..5ca4890a03 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -124,8 +125,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
               : recoverPipeData.get(recoverPipeDataSize - 1).getSerialNumber();
     } catch (IOException e) {
       logger.error(
-          String.format(
-              "Can not recover inputQueue from %s, because %s.", writingPipeLog.getPath(), e));
+          String.format("Can not recover inputQueue from %s.", writingPipeLog.getPath()), e);
     }
   }
 
@@ -146,8 +146,9 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
     } catch (IOException e) {
       logger.error(
           String.format(
-              "deserialize remove serial number error, remove serial number has been set to %d, because %s",
-              commitSerialNumber, e));
+              "deserialize remove serial number error, remove serial number has been set to %d.",
+              commitSerialNumber),
+          e);
     }
   }
 
@@ -170,9 +171,8 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
       }
     } catch (IOException e) {
       logger.error(
-          String.format(
-              "Recover output deque from pipe log %s error, because %s.",
-              readingPipeLog.getPath(), e));
+          String.format("Recover output deque from pipe log %s error.", readingPipeLog.getPath()),
+          e);
     }
   }
 
@@ -191,7 +191,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
       try {
         moveToNextPipeLog(pipeData.getSerialNumber());
       } catch (IOException e) {
-        logger.error(String.format("Move to next pipe log %s error, because %s.", pipeData, e));
+        logger.error(String.format("Move to next pipe log %s error.", pipeData), e);
       }
     }
     if (!inputDeque.offer(pipeData)) {
@@ -204,7 +204,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
     try {
       writeToDisk(pipeData);
     } catch (IOException e) {
-      logger.error(String.format("Record pipe data %s error, because %s.", pipeData, e));
+      logger.error(String.format("Record pipe data %s error.", pipeData), e);
       return false;
     }
     return true;
@@ -249,8 +249,10 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
       } else if (serialNumber == pipeLogStartNumber.peekLast() && inputDeque != null) {
         outputDeque = inputDeque;
       } else {
+        long nextStartNumber =
+            pipeLogStartNumber.stream().filter(o -> o >= serialNumber).findFirst().get();
         List<PipeData> parsePipeData =
-            parsePipeLog(new File(pipeLogDir, SyncPathUtil.getPipeLogName(serialNumber)));
+            parsePipeLog(new File(pipeLogDir, SyncPathUtil.getPipeLogName(nextStartNumber)));
         int parsePipeDataSize = parsePipeData.size();
         outputDeque = new LinkedBlockingDeque<>();
         for (int i = 0; i < parsePipeDataSize; i++) {
@@ -264,30 +266,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
 
   @Override
   public List<PipeData> pull(long serialNumber) {
-    List<PipeData> resPipeData = new ArrayList<>();
-
-    pullSerialNumber = commitSerialNumber;
-    while (pullSerialNumber < serialNumber) {
-      try {
-        PipeData pullPipeData = pullOnePipeData(pullSerialNumber);
-        if (pullPipeData != null) {
-          resPipeData.add(pullPipeData);
-          pullSerialNumber = pullPipeData.getSerialNumber();
-        } else {
-          break;
-        }
-      } catch (IOException e) {
-        logger.error(
-            String.format(
-                "Pull pipe data serial number %s error, because %s.", pullSerialNumber + 1, e));
-        break;
-      }
-    }
-
-    for (int i = resPipeData.size() - 1; i >= 0; --i) {
-      outputDeque.addFirst(resPipeData.get(i));
-    }
-    return resPipeData;
+    throw new NotImplementedException("Not implement pull");
   }
 
   @Override
@@ -302,8 +281,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
       }
     } catch (IOException e) {
       logger.error(
-          String.format(
-              "Blocking pull pipe data number %s error, because %s", commitSerialNumber + 1, e));
+          String.format("Blocking pull pipe data number %s error.", commitSerialNumber + 1), e);
     }
     outputDeque.addFirst(pipeData);
     pullSerialNumber = pipeData.getSerialNumber();
@@ -327,11 +305,11 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
 
   private void deletePipeData(long serialNumber) {
     while (commitSerialNumber < serialNumber) {
-      commitSerialNumber += 1;
+      PipeData commitData = null;
       try {
-        PipeData commitData = pullOnePipeData(commitSerialNumber);
+        commitData = pullOnePipeData(commitSerialNumber);
         if (commitData == null) {
-          continue;
+          return;
         }
         if (PipeData.PipeDataType.TSFILE.equals(commitData.getType())) {
           List<File> tsFiles = ((TsFilePipeData) commitData).getTsFiles(false);
@@ -341,8 +319,10 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
         }
       } catch (IOException e) {
         logger.error(
-            String.format(
-                "Commit pipe data serial number %s error, because %s.", commitSerialNumber, e));
+            String.format("Commit pipe data serial number %s error.", commitSerialNumber), e);
+      }
+      if (commitData != null) {
+        commitSerialNumber = commitData.getSerialNumber();
       }
     }
   }
@@ -357,8 +337,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
             Files.deleteIfExists(
                 new File(pipeLogDir, SyncPathUtil.getPipeLogName(nowPipeLogStartNumber)).toPath());
           } catch (IOException e) {
-            logger.warn(
-                String.format("Delete %s-pipe.log error, because %s.", nowPipeLogStartNumber, e));
+            logger.warn(String.format("Delete %s-pipe.log error.", nowPipeLogStartNumber), e);
           }
         } else {
           break;
@@ -385,8 +364,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
       }
     } catch (IOException e) {
       logger.error(
-          String.format(
-              "Serialize commit serial number %s error, because %s.", commitSerialNumber, e));
+          String.format("Serialize commit serial number %s error.", commitSerialNumber), e);
     }
   }
 
@@ -439,7 +417,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
       }
     } catch (EOFException e) {
     } catch (IllegalPathException e) {
-      logger.error(String.format("Parsing pipeLog %s error, because %s", file.getPath(), e));
+      logger.error(String.format("Parsing pipeLog %s error.", file.getPath()), e);
       throw new IOException(e);
     }
     return pipeData;
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
index 464231b909..fec84fef68 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
@@ -32,6 +32,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.DataOutputStream;
 import java.io.File;
@@ -43,6 +45,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 public class BufferedPipeDataQueueTest {
+  private static final Logger logger = LoggerFactory.getLogger(BufferedPipeDataQueueTest.class);
+
   File pipeLogDir =
       new File(
           SyncPathUtil.getReceiverPipeLogDir("pipe", "192.168.0.11", System.currentTimeMillis()));
@@ -510,6 +514,9 @@ public class BufferedPipeDataQueueTest {
                 pipeDataQueue.commit();
               } catch (InterruptedException e) {
                 break;
+              } catch (Exception e) {
+                e.printStackTrace();
+                break;
               }
             }
           });
@@ -540,4 +547,92 @@ public class BufferedPipeDataQueueTest {
       Assert.fail();
     }
   }
+
+  @Test
+  public void testOfferWhileTakingWithDiscontinuousSerialNumber() {
+    try {
+      DataOutputStream outputStream =
+          new DataOutputStream(
+              new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+      outputStream.writeLong(1);
+      outputStream.close();
+      List<PipeData> pipeDataList = new ArrayList<>();
+      // pipelog1: 3
+      DataOutputStream pipeLogOutput1 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
+      PipeData tsFile3PipeData = new TsFilePipeData("fake3", 3);
+      pipeDataList.add(tsFile3PipeData);
+      tsFile3PipeData.serialize(pipeLogOutput1);
+      pipeLogOutput1.close();
+      // pipelog2: 4,5,6,7,10
+      DataOutputStream pipeLogOutput2 =
+          new DataOutputStream(
+              new FileOutputStream(
+                  new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
+      for (int i = 4; i < 8; i++) {
+        PipeData pipeData =
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+        pipeDataList.add(pipeData);
+        pipeData.serialize(pipeLogOutput2);
+      }
+      PipeData schema10PipeData =
+          new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake10")), 10);
+      pipeDataList.add(schema10PipeData);
+      schema10PipeData.serialize(pipeLogOutput2);
+      pipeLogOutput2.close();
+      ;
+      // recovery
+      BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+      Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+      Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+      // take
+      List<PipeData> pipeDataTakeList = new ArrayList<>();
+      ExecutorService es1 = Executors.newSingleThreadExecutor();
+      es1.execute(
+          () -> {
+            while (true) {
+              try {
+                PipeData pipeData = pipeDataQueue.take();
+                logger.info(String.format("PipeData: %s", pipeData));
+                pipeDataTakeList.add(pipeData);
+                pipeDataQueue.commit();
+              } catch (InterruptedException e) {
+                break;
+              } catch (Exception e) {
+                e.printStackTrace();
+                break;
+              }
+            }
+          });
+      // offer
+      for (int i = 16; i < 20; i++) {
+        pipeDataQueue.offer(
+            new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i));
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      es1.shutdownNow();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+      logger.info(String.format("PipeDataTakeList: %s", pipeDataTakeList));
+      Assert.assertEquals(10, pipeDataTakeList.size());
+      for (int i = 0; i < 6; i++) {
+        Assert.assertEquals(pipeDataList.get(i), pipeDataTakeList.get(i));
+      }
+      pipeDataQueue.clear();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+  }
 }