You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/08/09 08:00:58 UTC

[iotdb] branch master updated: [IOTDB-4046] Enhance PipeData serialize()/deserialize() codes (#6902)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cac2c3237 [IOTDB-4046] Enhance PipeData serialize()/deserialize() codes (#6902)
0cac2c3237 is described below

commit 0cac2c32370c537d9f524ea328ee402ae31fbea5
Author: Jamber <ja...@sina.com>
AuthorDate: Tue Aug 9 16:00:52 2022 +0800

    [IOTDB-4046] Enhance PipeData serialize()/deserialize() codes (#6902)
---
 .../iotdb/db/sync/pipedata/DeletionPipeData.java   | 12 ++++++-----
 .../apache/iotdb/db/sync/pipedata/PipeData.java    | 24 ++++++++++++++++------
 .../iotdb/db/sync/pipedata/SchemaPipeData.java     | 12 ++++++-----
 .../iotdb/db/sync/pipedata/TsFilePipeData.java     | 19 ++++++++++++-----
 .../sync/pipedata/queue/BufferedPipeDataQueue.java |  2 +-
 .../transport/server/TransportServiceImpl.java     |  2 +-
 .../iotdb/db/sync/pipedata/PipeDataTest.java       | 12 +++++------
 7 files changed, 54 insertions(+), 29 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
index 631d829128..c1b3719cc8 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
@@ -37,6 +37,10 @@ public class DeletionPipeData extends PipeData {
 
   private Deletion deletion;
 
+  public DeletionPipeData() {
+    super();
+  }
+
   public DeletionPipeData(Deletion deletion, long serialNumber) {
     super(serialNumber);
     this.deletion = deletion;
@@ -52,11 +56,9 @@ public class DeletionPipeData extends PipeData {
     return super.serialize(stream) + deletion.serializeWithoutFileOffset(stream);
   }
 
-  public static DeletionPipeData deserialize(DataInputStream stream)
-      throws IOException, IllegalPathException {
-    long serialNumber = stream.readLong();
-    Deletion deletion = Deletion.deserializeWithoutFileOffset(stream);
-    return new DeletionPipeData(deletion, serialNumber);
+  public void deserialize(DataInputStream stream) throws IOException, IllegalPathException {
+    super.deserialize(stream);
+    deletion = Deletion.deserializeWithoutFileOffset(stream);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
index 7d0b6a3167..7d9adf1249 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
@@ -36,6 +36,8 @@ public abstract class PipeData {
 
   protected long serialNumber;
 
+  public PipeData() {}
+
   public PipeData(long serialNumber) {
     this.serialNumber = serialNumber;
   }
@@ -65,25 +67,35 @@ public abstract class PipeData {
     return byteStream.toByteArray();
   }
 
-  public static PipeData deserialize(DataInputStream stream)
+  public void deserialize(DataInputStream stream) throws IOException, IllegalPathException {
+    serialNumber = stream.readLong();
+  }
+
+  public static PipeData createPipeData(DataInputStream stream)
       throws IOException, IllegalPathException {
+    PipeData pipeData;
     PipeDataType type = PipeDataType.values()[stream.readByte()];
     switch (type) {
       case TSFILE:
-        return TsFilePipeData.deserialize(stream);
+        pipeData = new TsFilePipeData();
+        break;
       case DELETION:
-        return DeletionPipeData.deserialize(stream);
+        pipeData = new DeletionPipeData();
+        break;
       case SCHEMA:
-        return SchemaPipeData.deserialize(stream);
+        pipeData = new SchemaPipeData();
+        break;
       default:
         logger.error("Deserialize PipeData error because Unknown type {}.", type);
         throw new UnsupportedOperationException(
             "Deserialize PipeData error because Unknown type " + type);
     }
+    pipeData.deserialize(stream);
+    return pipeData;
   }
 
-  public static PipeData deserialize(byte[] bytes) throws IllegalPathException, IOException {
-    return deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
+  public static PipeData createPipeData(byte[] bytes) throws IllegalPathException, IOException {
+    return createPipeData(new DataInputStream(new ByteArrayInputStream(bytes)));
   }
 
   public abstract ILoader createLoader();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java
index 5dc1e7c104..ee1d342f24 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java
@@ -35,6 +35,10 @@ public class SchemaPipeData extends PipeData {
 
   private PhysicalPlan plan;
 
+  public SchemaPipeData() {
+    super();
+  }
+
   public SchemaPipeData(PhysicalPlan plan, long serialNumber) {
     super(serialNumber);
     this.plan = plan;
@@ -64,13 +68,11 @@ public class SchemaPipeData extends PipeData {
     return bytes;
   }
 
-  public static SchemaPipeData deserialize(DataInputStream stream)
-      throws IOException, IllegalPathException {
-    long serialNumber = stream.readLong();
+  public void deserialize(DataInputStream stream) throws IOException, IllegalPathException {
+    super.deserialize(stream);
     byte[] bytes = new byte[stream.readInt()];
     stream.read(bytes);
-    PhysicalPlan plan = PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes));
-    return new SchemaPipeData(plan, serialNumber);
+    plan = PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes));
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
index ac01553b3e..7fbdcadd23 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
@@ -19,6 +19,7 @@
  */
 package org.apache.iotdb.db.sync.pipedata;
 
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.sync.conf.SyncConstant;
@@ -45,6 +46,10 @@ public class TsFilePipeData extends PipeData {
   private String tsFileName;
   private String storageGroupName;
 
+  public TsFilePipeData() {
+    super();
+  }
+
   public TsFilePipeData(String tsFilePath, long serialNumber) {
     super(serialNumber);
     String sep = File.separator.equals("\\") ? "\\\\" : File.separator;
@@ -115,11 +120,15 @@ public class TsFilePipeData extends PipeData {
         + ReadWriteIOUtils.write(tsFileName, stream);
   }
 
-  public static TsFilePipeData deserialize(DataInputStream stream) throws IOException {
-    long serialNumber = stream.readLong();
-    String parentDirPath = ReadWriteIOUtils.readString(stream);
-    String tsFileName = ReadWriteIOUtils.readString(stream);
-    return new TsFilePipeData(parentDirPath == null ? "" : parentDirPath, tsFileName, serialNumber);
+  @Override
+  public void deserialize(DataInputStream stream) throws IOException, IllegalPathException {
+    super.deserialize(stream);
+    parentDirPath = ReadWriteIOUtils.readString(stream);
+    if (parentDirPath == null) {
+      parentDirPath = "";
+    }
+    tsFileName = ReadWriteIOUtils.readString(stream);
+    initStorageGroupName();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
index 7df0161272..45f3af881c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
@@ -434,7 +434,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
     List<PipeData> pipeData = new ArrayList<>();
     try (DataInputStream inputStream = new DataInputStream(new FileInputStream(file))) {
       while (true) {
-        pipeData.add(PipeData.deserialize(inputStream));
+        pipeData.add(PipeData.createPipeData(inputStream));
       }
     } catch (EOFException e) {
     } catch (IllegalPathException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
index e52560db24..1713b4e2db 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
@@ -253,7 +253,7 @@ public class TransportServiceImpl implements TransportService.Iface {
       byte[] byteArray = new byte[length];
       buff.get(byteArray);
       try {
-        PipeData pipeData = PipeData.deserialize(byteArray);
+        PipeData pipeData = PipeData.createPipeData(byteArray);
         if (type == Type.TSFILE) {
           // Do with file
           handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/PipeDataTest.java b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/PipeDataTest.java
index 8dd91f78ff..a9498eda6e 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/PipeDataTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/PipeDataTest.java
@@ -70,19 +70,19 @@ public class PipeDataTest {
       pipeData1.serialize(outputStream);
       outputStream.flush();
       DataInputStream inputStream = new DataInputStream(new FileInputStream(f1));
-      Assert.assertEquals(pipeData1, PipeData.deserialize(inputStream));
+      Assert.assertEquals(pipeData1, PipeData.createPipeData(inputStream));
       pipeData2.serialize(outputStream);
       outputStream.flush();
-      Assert.assertEquals(pipeData2, PipeData.deserialize(inputStream));
+      Assert.assertEquals(pipeData2, PipeData.createPipeData(inputStream));
       pipeData3.serialize(outputStream);
       outputStream.flush();
-      Assert.assertEquals(pipeData3, PipeData.deserialize(inputStream));
+      Assert.assertEquals(pipeData3, PipeData.createPipeData(inputStream));
       inputStream.close();
       outputStream.close();
 
-      Assert.assertEquals(pipeData1, PipeData.deserialize(pipeData1.serialize()));
-      Assert.assertEquals(pipeData2, PipeData.deserialize(pipeData2.serialize()));
-      Assert.assertEquals(pipeData3, PipeData.deserialize(pipeData3.serialize()));
+      Assert.assertEquals(pipeData1, PipeData.createPipeData(pipeData1.serialize()));
+      Assert.assertEquals(pipeData2, PipeData.createPipeData(pipeData2.serialize()));
+      Assert.assertEquals(pipeData3, PipeData.createPipeData(pipeData3.serialize()));
     } catch (Exception e) {
       logger.error(e.getMessage());
       Assert.fail();