You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/08/09 08:00:58 UTC
[iotdb] branch master updated: [IOTDB-4046] Enhance PipeData serialize()/deserialize() codes (#6902)
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0cac2c3237 [IOTDB-4046] Enhance PipeData serialize()/deserialize() codes (#6902)
0cac2c3237 is described below
commit 0cac2c32370c537d9f524ea328ee402ae31fbea5
Author: Jamber <ja...@sina.com>
AuthorDate: Tue Aug 9 16:00:52 2022 +0800
[IOTDB-4046] Enhance PipeData serialize()/deserialize() codes (#6902)
---
.../iotdb/db/sync/pipedata/DeletionPipeData.java | 12 ++++++-----
.../apache/iotdb/db/sync/pipedata/PipeData.java | 24 ++++++++++++++++------
.../iotdb/db/sync/pipedata/SchemaPipeData.java | 12 ++++++-----
.../iotdb/db/sync/pipedata/TsFilePipeData.java | 19 ++++++++++++-----
.../sync/pipedata/queue/BufferedPipeDataQueue.java | 2 +-
.../transport/server/TransportServiceImpl.java | 2 +-
.../iotdb/db/sync/pipedata/PipeDataTest.java | 12 +++++------
7 files changed, 54 insertions(+), 29 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
index 631d829128..c1b3719cc8 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
@@ -37,6 +37,10 @@ public class DeletionPipeData extends PipeData {
private Deletion deletion;
+ public DeletionPipeData() {
+ super();
+ }
+
public DeletionPipeData(Deletion deletion, long serialNumber) {
super(serialNumber);
this.deletion = deletion;
@@ -52,11 +56,9 @@ public class DeletionPipeData extends PipeData {
return super.serialize(stream) + deletion.serializeWithoutFileOffset(stream);
}
- public static DeletionPipeData deserialize(DataInputStream stream)
- throws IOException, IllegalPathException {
- long serialNumber = stream.readLong();
- Deletion deletion = Deletion.deserializeWithoutFileOffset(stream);
- return new DeletionPipeData(deletion, serialNumber);
+ public void deserialize(DataInputStream stream) throws IOException, IllegalPathException {
+ super.deserialize(stream);
+ deletion = Deletion.deserializeWithoutFileOffset(stream);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
index 7d0b6a3167..7d9adf1249 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
@@ -36,6 +36,8 @@ public abstract class PipeData {
protected long serialNumber;
+ public PipeData() {}
+
public PipeData(long serialNumber) {
this.serialNumber = serialNumber;
}
@@ -65,25 +67,35 @@ public abstract class PipeData {
return byteStream.toByteArray();
}
- public static PipeData deserialize(DataInputStream stream)
+ public void deserialize(DataInputStream stream) throws IOException, IllegalPathException {
+ serialNumber = stream.readLong();
+ }
+
+ public static PipeData createPipeData(DataInputStream stream)
throws IOException, IllegalPathException {
+ PipeData pipeData;
PipeDataType type = PipeDataType.values()[stream.readByte()];
switch (type) {
case TSFILE:
- return TsFilePipeData.deserialize(stream);
+ pipeData = new TsFilePipeData();
+ break;
case DELETION:
- return DeletionPipeData.deserialize(stream);
+ pipeData = new DeletionPipeData();
+ break;
case SCHEMA:
- return SchemaPipeData.deserialize(stream);
+ pipeData = new SchemaPipeData();
+ break;
default:
logger.error("Deserialize PipeData error because Unknown type {}.", type);
throw new UnsupportedOperationException(
"Deserialize PipeData error because Unknown type " + type);
}
+ pipeData.deserialize(stream);
+ return pipeData;
}
- public static PipeData deserialize(byte[] bytes) throws IllegalPathException, IOException {
- return deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
+ public static PipeData createPipeData(byte[] bytes) throws IllegalPathException, IOException {
+ return createPipeData(new DataInputStream(new ByteArrayInputStream(bytes)));
}
public abstract ILoader createLoader();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java
index 5dc1e7c104..ee1d342f24 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java
@@ -35,6 +35,10 @@ public class SchemaPipeData extends PipeData {
private PhysicalPlan plan;
+ public SchemaPipeData() {
+ super();
+ }
+
public SchemaPipeData(PhysicalPlan plan, long serialNumber) {
super(serialNumber);
this.plan = plan;
@@ -64,13 +68,11 @@ public class SchemaPipeData extends PipeData {
return bytes;
}
- public static SchemaPipeData deserialize(DataInputStream stream)
- throws IOException, IllegalPathException {
- long serialNumber = stream.readLong();
+ public void deserialize(DataInputStream stream) throws IOException, IllegalPathException {
+ super.deserialize(stream);
byte[] bytes = new byte[stream.readInt()];
stream.read(bytes);
- PhysicalPlan plan = PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes));
- return new SchemaPipeData(plan, serialNumber);
+ plan = PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes));
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
index ac01553b3e..7fbdcadd23 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
@@ -19,6 +19,7 @@
*/
package org.apache.iotdb.db.sync.pipedata;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.sync.conf.SyncConstant;
@@ -45,6 +46,10 @@ public class TsFilePipeData extends PipeData {
private String tsFileName;
private String storageGroupName;
+ public TsFilePipeData() {
+ super();
+ }
+
public TsFilePipeData(String tsFilePath, long serialNumber) {
super(serialNumber);
String sep = File.separator.equals("\\") ? "\\\\" : File.separator;
@@ -115,11 +120,15 @@ public class TsFilePipeData extends PipeData {
+ ReadWriteIOUtils.write(tsFileName, stream);
}
- public static TsFilePipeData deserialize(DataInputStream stream) throws IOException {
- long serialNumber = stream.readLong();
- String parentDirPath = ReadWriteIOUtils.readString(stream);
- String tsFileName = ReadWriteIOUtils.readString(stream);
- return new TsFilePipeData(parentDirPath == null ? "" : parentDirPath, tsFileName, serialNumber);
+ @Override
+ public void deserialize(DataInputStream stream) throws IOException, IllegalPathException {
+ super.deserialize(stream);
+ parentDirPath = ReadWriteIOUtils.readString(stream);
+ if (parentDirPath == null) {
+ parentDirPath = "";
+ }
+ tsFileName = ReadWriteIOUtils.readString(stream);
+ initStorageGroupName();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
index 7df0161272..45f3af881c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
@@ -434,7 +434,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
List<PipeData> pipeData = new ArrayList<>();
try (DataInputStream inputStream = new DataInputStream(new FileInputStream(file))) {
while (true) {
- pipeData.add(PipeData.deserialize(inputStream));
+ pipeData.add(PipeData.createPipeData(inputStream));
}
} catch (EOFException e) {
} catch (IllegalPathException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
index e52560db24..1713b4e2db 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
@@ -253,7 +253,7 @@ public class TransportServiceImpl implements TransportService.Iface {
byte[] byteArray = new byte[length];
buff.get(byteArray);
try {
- PipeData pipeData = PipeData.deserialize(byteArray);
+ PipeData pipeData = PipeData.createPipeData(byteArray);
if (type == Type.TSFILE) {
// Do with file
handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/PipeDataTest.java b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/PipeDataTest.java
index 8dd91f78ff..a9498eda6e 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/PipeDataTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/PipeDataTest.java
@@ -70,19 +70,19 @@ public class PipeDataTest {
pipeData1.serialize(outputStream);
outputStream.flush();
DataInputStream inputStream = new DataInputStream(new FileInputStream(f1));
- Assert.assertEquals(pipeData1, PipeData.deserialize(inputStream));
+ Assert.assertEquals(pipeData1, PipeData.createPipeData(inputStream));
pipeData2.serialize(outputStream);
outputStream.flush();
- Assert.assertEquals(pipeData2, PipeData.deserialize(inputStream));
+ Assert.assertEquals(pipeData2, PipeData.createPipeData(inputStream));
pipeData3.serialize(outputStream);
outputStream.flush();
- Assert.assertEquals(pipeData3, PipeData.deserialize(inputStream));
+ Assert.assertEquals(pipeData3, PipeData.createPipeData(inputStream));
inputStream.close();
outputStream.close();
- Assert.assertEquals(pipeData1, PipeData.deserialize(pipeData1.serialize()));
- Assert.assertEquals(pipeData2, PipeData.deserialize(pipeData2.serialize()));
- Assert.assertEquals(pipeData3, PipeData.deserialize(pipeData3.serialize()));
+ Assert.assertEquals(pipeData1, PipeData.createPipeData(pipeData1.serialize()));
+ Assert.assertEquals(pipeData2, PipeData.createPipeData(pipeData2.serialize()));
+ Assert.assertEquals(pipeData3, PipeData.createPipeData(pipeData3.serialize()));
} catch (Exception e) {
logger.error(e.getMessage());
Assert.fail();