You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/24 04:03:33 UTC
[iotdb] branch master updated: [IOTDB-4293] BufferedPipeDataQueue supports discontinuous serialNumber (#7664)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6101e132d6 [IOTDB-4293] BufferedPipeDataQueue supports discontinuous serialNumber (#7664)
6101e132d6 is described below
commit 6101e132d6e017a27088cb456018ad06297a35cf
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Mon Oct 24 12:03:28 2022 +0800
[IOTDB-4293] BufferedPipeDataQueue supports discontinuous serialNumber (#7664)
---
.../sync/pipedata/queue/BufferedPipeDataQueue.java | 70 ++++++----------
.../sync/pipedata/BufferedPipeDataQueueTest.java | 95 ++++++++++++++++++++++
2 files changed, 119 insertions(+), 46 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
index 8d5a23d4b1..5ca4890a03 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -124,8 +125,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
: recoverPipeData.get(recoverPipeDataSize - 1).getSerialNumber();
} catch (IOException e) {
logger.error(
- String.format(
- "Can not recover inputQueue from %s, because %s.", writingPipeLog.getPath(), e));
+ String.format("Can not recover inputQueue from %s.", writingPipeLog.getPath()), e);
}
}
@@ -146,8 +146,9 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
} catch (IOException e) {
logger.error(
String.format(
- "deserialize remove serial number error, remove serial number has been set to %d, because %s",
- commitSerialNumber, e));
+ "deserialize remove serial number error, remove serial number has been set to %d.",
+ commitSerialNumber),
+ e);
}
}
@@ -170,9 +171,8 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
}
} catch (IOException e) {
logger.error(
- String.format(
- "Recover output deque from pipe log %s error, because %s.",
- readingPipeLog.getPath(), e));
+ String.format("Recover output deque from pipe log %s error.", readingPipeLog.getPath()),
+ e);
}
}
@@ -191,7 +191,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
try {
moveToNextPipeLog(pipeData.getSerialNumber());
} catch (IOException e) {
- logger.error(String.format("Move to next pipe log %s error, because %s.", pipeData, e));
+ logger.error(String.format("Move to next pipe log %s error.", pipeData), e);
}
}
if (!inputDeque.offer(pipeData)) {
@@ -204,7 +204,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
try {
writeToDisk(pipeData);
} catch (IOException e) {
- logger.error(String.format("Record pipe data %s error, because %s.", pipeData, e));
+ logger.error(String.format("Record pipe data %s error.", pipeData), e);
return false;
}
return true;
@@ -249,8 +249,10 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
} else if (serialNumber == pipeLogStartNumber.peekLast() && inputDeque != null) {
outputDeque = inputDeque;
} else {
+ long nextStartNumber =
+ pipeLogStartNumber.stream().filter(o -> o >= serialNumber).findFirst().get();
List<PipeData> parsePipeData =
- parsePipeLog(new File(pipeLogDir, SyncPathUtil.getPipeLogName(serialNumber)));
+ parsePipeLog(new File(pipeLogDir, SyncPathUtil.getPipeLogName(nextStartNumber)));
int parsePipeDataSize = parsePipeData.size();
outputDeque = new LinkedBlockingDeque<>();
for (int i = 0; i < parsePipeDataSize; i++) {
@@ -264,30 +266,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
@Override
public List<PipeData> pull(long serialNumber) {
- List<PipeData> resPipeData = new ArrayList<>();
-
- pullSerialNumber = commitSerialNumber;
- while (pullSerialNumber < serialNumber) {
- try {
- PipeData pullPipeData = pullOnePipeData(pullSerialNumber);
- if (pullPipeData != null) {
- resPipeData.add(pullPipeData);
- pullSerialNumber = pullPipeData.getSerialNumber();
- } else {
- break;
- }
- } catch (IOException e) {
- logger.error(
- String.format(
- "Pull pipe data serial number %s error, because %s.", pullSerialNumber + 1, e));
- break;
- }
- }
-
- for (int i = resPipeData.size() - 1; i >= 0; --i) {
- outputDeque.addFirst(resPipeData.get(i));
- }
- return resPipeData;
+ throw new NotImplementedException("Not implement pull");
}
@Override
@@ -302,8 +281,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
}
} catch (IOException e) {
logger.error(
- String.format(
- "Blocking pull pipe data number %s error, because %s", commitSerialNumber + 1, e));
+ String.format("Blocking pull pipe data number %s error.", commitSerialNumber + 1), e);
}
outputDeque.addFirst(pipeData);
pullSerialNumber = pipeData.getSerialNumber();
@@ -327,11 +305,11 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
private void deletePipeData(long serialNumber) {
while (commitSerialNumber < serialNumber) {
- commitSerialNumber += 1;
+ PipeData commitData = null;
try {
- PipeData commitData = pullOnePipeData(commitSerialNumber);
+ commitData = pullOnePipeData(commitSerialNumber);
if (commitData == null) {
- continue;
+ return;
}
if (PipeData.PipeDataType.TSFILE.equals(commitData.getType())) {
List<File> tsFiles = ((TsFilePipeData) commitData).getTsFiles(false);
@@ -341,8 +319,10 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
}
} catch (IOException e) {
logger.error(
- String.format(
- "Commit pipe data serial number %s error, because %s.", commitSerialNumber, e));
+ String.format("Commit pipe data serial number %s error.", commitSerialNumber), e);
+ }
+ if (commitData != null) {
+ commitSerialNumber = commitData.getSerialNumber();
}
}
}
@@ -357,8 +337,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
Files.deleteIfExists(
new File(pipeLogDir, SyncPathUtil.getPipeLogName(nowPipeLogStartNumber)).toPath());
} catch (IOException e) {
- logger.warn(
- String.format("Delete %s-pipe.log error, because %s.", nowPipeLogStartNumber, e));
+ logger.warn(String.format("Delete %s-pipe.log error.", nowPipeLogStartNumber), e);
}
} else {
break;
@@ -385,8 +364,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
}
} catch (IOException e) {
logger.error(
- String.format(
- "Serialize commit serial number %s error, because %s.", commitSerialNumber, e));
+ String.format("Serialize commit serial number %s error.", commitSerialNumber), e);
}
}
@@ -439,7 +417,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
}
} catch (EOFException e) {
} catch (IllegalPathException e) {
- logger.error(String.format("Parsing pipeLog %s error, because %s", file.getPath(), e));
+ logger.error(String.format("Parsing pipeLog %s error.", file.getPath()), e);
throw new IOException(e);
}
return pipeData;
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
index 464231b909..fec84fef68 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
@@ -32,6 +32,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File;
@@ -43,6 +45,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BufferedPipeDataQueueTest {
+ private static final Logger logger = LoggerFactory.getLogger(BufferedPipeDataQueueTest.class);
+
File pipeLogDir =
new File(
SyncPathUtil.getReceiverPipeLogDir("pipe", "192.168.0.11", System.currentTimeMillis()));
@@ -510,6 +514,9 @@ public class BufferedPipeDataQueueTest {
pipeDataQueue.commit();
} catch (InterruptedException e) {
break;
+ } catch (Exception e) {
+ e.printStackTrace();
+ break;
}
}
});
@@ -540,4 +547,92 @@ public class BufferedPipeDataQueueTest {
Assert.fail();
}
}
+
+ @Test
+ public void testOfferWhileTakingWithDiscontinuousSerialNumber() {
+ try {
+ DataOutputStream outputStream =
+ new DataOutputStream(
+ new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+ outputStream.writeLong(1);
+ outputStream.close();
+ List<PipeData> pipeDataList = new ArrayList<>();
+ // pipelog1: 3
+ DataOutputStream pipeLogOutput1 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
+ PipeData tsFile3PipeData = new TsFilePipeData("fake3", 3);
+ pipeDataList.add(tsFile3PipeData);
+ tsFile3PipeData.serialize(pipeLogOutput1);
+ pipeLogOutput1.close();
+ // pipelog2: 4,5,6,7,10
+ DataOutputStream pipeLogOutput2 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
+ for (int i = 4; i < 8; i++) {
+ PipeData pipeData =
+ new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+ pipeDataList.add(pipeData);
+ pipeData.serialize(pipeLogOutput2);
+ }
+ PipeData schema10PipeData =
+ new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake10")), 10);
+ pipeDataList.add(schema10PipeData);
+ schema10PipeData.serialize(pipeLogOutput2);
+ pipeLogOutput2.close();
+ ;
+ // recovery
+ BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+ Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+ Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+ // take
+ List<PipeData> pipeDataTakeList = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ while (true) {
+ try {
+ PipeData pipeData = pipeDataQueue.take();
+ logger.info(String.format("PipeData: %s", pipeData));
+ pipeDataTakeList.add(pipeData);
+ pipeDataQueue.commit();
+ } catch (InterruptedException e) {
+ break;
+ } catch (Exception e) {
+ e.printStackTrace();
+ break;
+ }
+ }
+ });
+ // offer
+ for (int i = 16; i < 20; i++) {
+ pipeDataQueue.offer(
+ new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i));
+ }
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ logger.info(String.format("PipeDataTakeList: %s", pipeDataTakeList));
+ Assert.assertEquals(10, pipeDataTakeList.size());
+ for (int i = 0; i < 6; i++) {
+ Assert.assertEquals(pipeDataList.get(i), pipeDataTakeList.get(i));
+ }
+ pipeDataQueue.clear();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
}