You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/23 05:25:20 UTC

[iotdb] branch master updated: [IOTDB-2979] Optimize the serialization and deserialization of thrift data structures (#5637)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 949abc4294 [IOTDB-2979] Optimize the serialization and deserialization of thrift data structures (#5637)
949abc4294 is described below

commit 949abc42948a5cd2f50b0fb5e2a45d4dbe0e843b
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sat Apr 23 13:25:16 2022 +0800

    [IOTDB-2979] Optimize the serialization and deserialization of thrift data structures (#5637)
    
    * done
    
    * debug
    
    Co-authored-by: CRZbulabula <cr...@gmail.com>
---
 .../exception/runtime/ThriftSerDeException.java    |  26 ++++
 .../commons/utils/ThriftCommonsSerDeUtils.java     | 147 ++++++++++++---------
 .../commons/utils/ThriftConfigNodeSerDeUtils.java  |  72 ++++------
 .../node/process/GroupByLevelNodeSerdeTest.java    |   2 +-
 .../sql/plan/node/process/LimitNodeSerdeTest.java  |   2 +-
 .../iotdb/db/service/InternalServiceImplTest.java  |   4 +
 6 files changed, 143 insertions(+), 110 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/runtime/ThriftSerDeException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/runtime/ThriftSerDeException.java
new file mode 100644
index 0000000000..c138b60e07
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/runtime/ThriftSerDeException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.exception.runtime;
+
+public class ThriftSerDeException extends RuntimeException {
+
+  public ThriftSerDeException(String msg, Throwable e) {
+    super(msg + e.toString());
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
index 1b40fa84a6..4c665fca83 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
@@ -19,18 +19,21 @@
 package org.apache.iotdb.commons.utils;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TByteBuffer;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 
-// TODO: Serialize and deserialize by thrift structure
 /** Utils for serialize and deserialize all the data struct defined by thrift-commons */
 public class ThriftCommonsSerDeUtils {
 
@@ -38,107 +41,125 @@ public class ThriftCommonsSerDeUtils {
     // Empty constructor
   }
 
+  private static TBinaryProtocol generateWriteProtocol(ByteBuffer buffer)
+      throws TTransportException {
+    TTransport transport = new TByteBuffer(buffer);
+    return new TBinaryProtocol(transport);
+  }
+
+  private static TBinaryProtocol generateReadProtocol(ByteBuffer buffer)
+      throws TTransportException {
+    TTransport transport = new TByteBuffer(buffer);
+    return new TBinaryProtocol(transport);
+  }
+
   public static void writeTEndPoint(TEndPoint endPoint, ByteBuffer buffer) {
-    BasicStructureSerDeUtil.write(endPoint.getIp(), buffer);
-    buffer.putInt(endPoint.getPort());
+    try {
+      endPoint.write(generateWriteProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Write TEndPoint failed: ", e);
+    }
   }
 
   public static TEndPoint readTEndPoint(ByteBuffer buffer) {
     TEndPoint endPoint = new TEndPoint();
-    endPoint.setIp(BasicStructureSerDeUtil.readString(buffer));
-    endPoint.setPort(buffer.getInt());
+    try {
+      endPoint.read(generateReadProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Read TEndPoint failed: ", e);
+    }
     return endPoint;
   }
 
   public static void writeTDataNodeLocation(TDataNodeLocation dataNodeLocation, ByteBuffer buffer) {
-    buffer.putInt(dataNodeLocation.getDataNodeId());
-
-    buffer.put(dataNodeLocation.isSetExternalEndPoint() ? (byte) 1 : (byte) 0);
-    if (dataNodeLocation.isSetExternalEndPoint()) {
-      writeTEndPoint(dataNodeLocation.getExternalEndPoint(), buffer);
-    }
-
-    buffer.put(dataNodeLocation.isSetInternalEndPoint() ? (byte) 1 : (byte) 0);
-    if (dataNodeLocation.isSetInternalEndPoint()) {
-      writeTEndPoint(dataNodeLocation.getInternalEndPoint(), buffer);
-    }
-
-    buffer.put(dataNodeLocation.isSetDataBlockManagerEndPoint() ? (byte) 1 : (byte) 0);
-    if (dataNodeLocation.isSetDataBlockManagerEndPoint()) {
-      writeTEndPoint(dataNodeLocation.getDataBlockManagerEndPoint(), buffer);
-    }
-
-    buffer.put(dataNodeLocation.isSetConsensusEndPoint() ? (byte) 1 : (byte) 0);
-    if (dataNodeLocation.isSetConsensusEndPoint()) {
-      writeTEndPoint(dataNodeLocation.getConsensusEndPoint(), buffer);
+    try {
+      dataNodeLocation.write(generateWriteProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Write TDataNodeLocation failed: ", e);
     }
   }
 
   public static TDataNodeLocation readTDataNodeLocation(ByteBuffer buffer) {
     TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
-    dataNodeLocation.setDataNodeId(buffer.getInt());
-
-    if (buffer.get() > 0) {
-      dataNodeLocation.setExternalEndPoint(readTEndPoint(buffer));
-    }
-
-    if (buffer.get() > 0) {
-      dataNodeLocation.setInternalEndPoint(readTEndPoint(buffer));
-    }
-
-    if (buffer.get() > 0) {
-      dataNodeLocation.setDataBlockManagerEndPoint(readTEndPoint(buffer));
-    }
-
-    if (buffer.get() > 0) {
-      dataNodeLocation.setConsensusEndPoint(readTEndPoint(buffer));
+    try {
+      dataNodeLocation.read(generateReadProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Read TDataNodeLocation failed: ", e);
     }
     return dataNodeLocation;
   }
 
   public static void writeTSeriesPartitionSlot(
       TSeriesPartitionSlot seriesPartitionSlot, ByteBuffer buffer) {
-    buffer.putInt(seriesPartitionSlot.getSlotId());
+    try {
+      seriesPartitionSlot.write(generateWriteProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Write TSeriesPartitionSlot failed: ", e);
+    }
   }
 
   public static TSeriesPartitionSlot readTSeriesPartitionSlot(ByteBuffer buffer) {
-    return new TSeriesPartitionSlot(buffer.getInt());
+    TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot();
+    try {
+      seriesPartitionSlot.read(generateReadProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Read TSeriesPartitionSlot failed: ", e);
+    }
+    return seriesPartitionSlot;
   }
 
   public static void writeTTimePartitionSlot(
       TTimePartitionSlot timePartitionSlot, ByteBuffer buffer) {
-    buffer.putLong(timePartitionSlot.getStartTime());
+    try {
+      timePartitionSlot.write(generateWriteProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Write TTimePartitionSlot failed: ", e);
+    }
   }
 
   public static TTimePartitionSlot readTTimePartitionSlot(ByteBuffer buffer) {
-    return new TTimePartitionSlot(buffer.getLong());
+    TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
+    try {
+      timePartitionSlot.read(generateReadProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Read TTimePartitionSlot failed: ", e);
+    }
+    return timePartitionSlot;
   }
 
   public static void writeTConsensusGroupId(TConsensusGroupId consensusGroupId, ByteBuffer buffer) {
-    buffer.putInt(consensusGroupId.getType().ordinal());
-    buffer.putInt(consensusGroupId.getId());
+    try {
+      consensusGroupId.write(generateWriteProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Write TConsensusGroupId failed: ", e);
+    }
   }
 
   public static TConsensusGroupId readTConsensusGroupId(ByteBuffer buffer) {
-    TConsensusGroupType type = TConsensusGroupType.values()[buffer.getInt()];
-    return new TConsensusGroupId(type, buffer.getInt());
+    TConsensusGroupId consensusGroupId = new TConsensusGroupId();
+    try {
+      consensusGroupId.read(generateReadProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Read TConsensusGroupId failed: ", e);
+    }
+    return consensusGroupId;
   }
 
   public static void writeTRegionReplicaSet(TRegionReplicaSet regionReplicaSet, ByteBuffer buffer) {
-    writeTConsensusGroupId(regionReplicaSet.getRegionId(), buffer);
-    buffer.putInt(regionReplicaSet.getDataNodeLocationsSize());
-    regionReplicaSet
-        .getDataNodeLocations()
-        .forEach(dataNodeLocation -> writeTDataNodeLocation(dataNodeLocation, buffer));
+    try {
+      regionReplicaSet.write(generateWriteProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Write TRegionReplicaSet failed: ", e);
+    }
   }
 
   public static TRegionReplicaSet readTRegionReplicaSet(ByteBuffer buffer) {
-    TConsensusGroupId consensusGroupId = readTConsensusGroupId(buffer);
-    int dataNodeLocationNum = buffer.getInt();
-    List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
-    for (int i = 0; i < dataNodeLocationNum; i++) {
-      dataNodeLocations.add(readTDataNodeLocation(buffer));
+    TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
+    try {
+      regionReplicaSet.read(generateReadProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Read TRegionReplicaSet failed: ", e);
     }
-    return new TRegionReplicaSet(consensusGroupId, dataNodeLocations);
+    return regionReplicaSet;
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
index 85122f55ad..d8f0c58772 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
@@ -18,12 +18,17 @@
  */
 package org.apache.iotdb.commons.utils;
 
+import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TByteBuffer;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 
-// TODO: Serialize and deserialize by thrift structure
 /** Utils for serialize and deserialize all the data struct defined by thrift-confignode */
 public class ThriftConfigNodeSerDeUtils {
 
@@ -31,57 +36,34 @@ public class ThriftConfigNodeSerDeUtils {
     // Empty constructor
   }
 
-  public static void writeTStorageGroupSchema(
-      TStorageGroupSchema storageGroupSchema, ByteBuffer buffer) {
-    BasicStructureSerDeUtil.write(storageGroupSchema.getName(), buffer);
-    buffer.putLong(storageGroupSchema.getTTL());
-    buffer.putInt(storageGroupSchema.getSchemaReplicationFactor());
-    buffer.putInt(storageGroupSchema.getDataReplicationFactor());
-    buffer.putLong(storageGroupSchema.getTimePartitionInterval());
+  private static TBinaryProtocol generateWriteProtocol(ByteBuffer buffer)
+      throws TTransportException {
+    TTransport transport = new TByteBuffer(buffer);
+    return new TBinaryProtocol(transport);
+  }
 
-    buffer.putInt(storageGroupSchema.getSchemaRegionGroupIdsSize());
-    if (storageGroupSchema.getSchemaRegionGroupIdsSize() > 0) {
-      storageGroupSchema
-          .getSchemaRegionGroupIds()
-          .forEach(
-              schemaRegionGroupId ->
-                  ThriftCommonsSerDeUtils.writeTConsensusGroupId(schemaRegionGroupId, buffer));
-    }
+  private static TBinaryProtocol generateReadProtocol(ByteBuffer buffer)
+      throws TTransportException {
+    TTransport transport = new TByteBuffer(buffer);
+    return new TBinaryProtocol(transport);
+  }
 
-    buffer.putInt(storageGroupSchema.getDataRegionGroupIdsSize());
-    if (storageGroupSchema.getDataRegionGroupIdsSize() > 0) {
-      storageGroupSchema
-          .getDataRegionGroupIds()
-          .forEach(
-              dataRegionGroupId ->
-                  ThriftCommonsSerDeUtils.writeTConsensusGroupId(dataRegionGroupId, buffer));
+  public static void writeTStorageGroupSchema(
+      TStorageGroupSchema storageGroupSchema, ByteBuffer buffer) {
+    try {
+      storageGroupSchema.write(generateWriteProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Write TStorageGroupSchema failed: ", e);
     }
   }
 
   public static TStorageGroupSchema readTStorageGroupSchema(ByteBuffer buffer) {
     TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
-    storageGroupSchema.setName(BasicStructureSerDeUtil.readString(buffer));
-    storageGroupSchema.setTTL(buffer.getLong());
-    storageGroupSchema.setSchemaReplicationFactor(buffer.getInt());
-    storageGroupSchema.setDataReplicationFactor(buffer.getInt());
-    storageGroupSchema.setTimePartitionInterval(buffer.getLong());
-
-    int groupIdNum = buffer.getInt();
-    storageGroupSchema.setSchemaRegionGroupIds(new ArrayList<>());
-    for (int i = 0; i < groupIdNum; i++) {
-      storageGroupSchema
-          .getSchemaRegionGroupIds()
-          .add(ThriftCommonsSerDeUtils.readTConsensusGroupId(buffer));
+    try {
+      storageGroupSchema.read(generateReadProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Read TStorageGroupSchema failed: ", e);
     }
-
-    groupIdNum = buffer.getInt();
-    storageGroupSchema.setDataRegionGroupIds(new ArrayList<>());
-    for (int i = 0; i < groupIdNum; i++) {
-      storageGroupSchema
-          .getDataRegionGroupIds()
-          .add(ThriftCommonsSerDeUtils.readTConsensusGroupId(buffer));
-    }
-
     return storageGroupSchema;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
index 8a5a61e82f..bbe4f15dd6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
@@ -119,7 +119,7 @@ public class GroupByLevelNodeSerdeTest {
             new int[] {1, 3},
             groupedPathMap);
 
-    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+    ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
     groupByLevelNode.serialize(byteBuffer);
     byteBuffer.flip();
     assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), groupByLevelNode);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
index 3e1066dd86..0084cf9545 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
@@ -121,7 +121,7 @@ public class LimitNodeSerdeTest {
             groupedPathMap);
 
     LimitNode limitNode = new LimitNode(new PlanNodeId("TestLimitNode"), groupByLevelNode, 3);
-    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+    ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
     limitNode.serialize(byteBuffer);
     byteBuffer.flip();
     assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), limitNode);
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index bf0dbad703..255b73f5a7 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -256,6 +256,10 @@ public class InternalServiceImplTest {
     List<TDataNodeLocation> dataNodeList = new ArrayList<>();
     dataNodeList.add(
         new TDataNodeLocation()
+            .setExternalEndPoint(new TEndPoint(conf.getRpcAddress(), conf.getRpcPort()))
+            .setInternalEndPoint(new TEndPoint(conf.getInternalIp(), conf.getInternalPort()))
+            .setDataBlockManagerEndPoint(
+                new TEndPoint(conf.getInternalIp(), conf.getDataBlockManagerPort()))
             .setConsensusEndPoint(new TEndPoint(conf.getInternalIp(), conf.getConsensusPort())));
 
     // construct fragmentInstance