You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/23 05:25:20 UTC
[iotdb] branch master updated: [IOTDB-2979] Optimize the serialization and deserialization of thrift data structures (#5637)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 949abc4294 [IOTDB-2979] Optimize the serialization and deserialization of thrift data structures (#5637)
949abc4294 is described below
commit 949abc42948a5cd2f50b0fb5e2a45d4dbe0e843b
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sat Apr 23 13:25:16 2022 +0800
[IOTDB-2979] Optimize the serialization and deserialization of thrift data structures (#5637)
* done
* debug
Co-authored-by: CRZbulabula <cr...@gmail.com>
---
.../exception/runtime/ThriftSerDeException.java | 26 ++++
.../commons/utils/ThriftCommonsSerDeUtils.java | 147 ++++++++++++---------
.../commons/utils/ThriftConfigNodeSerDeUtils.java | 72 ++++------
.../node/process/GroupByLevelNodeSerdeTest.java | 2 +-
.../sql/plan/node/process/LimitNodeSerdeTest.java | 2 +-
.../iotdb/db/service/InternalServiceImplTest.java | 4 +
6 files changed, 143 insertions(+), 110 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/exception/runtime/ThriftSerDeException.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/runtime/ThriftSerDeException.java
new file mode 100644
index 0000000000..c138b60e07
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/runtime/ThriftSerDeException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.exception.runtime;
+
+public class ThriftSerDeException extends RuntimeException {
+
+ public ThriftSerDeException(String msg, Throwable e) {
+ super(msg + e.toString());
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
index 1b40fa84a6..4c665fca83 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
@@ -19,18 +19,21 @@
package org.apache.iotdb.commons.utils;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TByteBuffer;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-// TODO: Serialize and deserialize by thrift structure
/** Utils for serialize and deserialize all the data struct defined by thrift-commons */
public class ThriftCommonsSerDeUtils {
@@ -38,107 +41,125 @@ public class ThriftCommonsSerDeUtils {
// Empty constructor
}
+ private static TBinaryProtocol generateWriteProtocol(ByteBuffer buffer)
+ throws TTransportException {
+ TTransport transport = new TByteBuffer(buffer);
+ return new TBinaryProtocol(transport);
+ }
+
+ private static TBinaryProtocol generateReadProtocol(ByteBuffer buffer)
+ throws TTransportException {
+ TTransport transport = new TByteBuffer(buffer);
+ return new TBinaryProtocol(transport);
+ }
+
public static void writeTEndPoint(TEndPoint endPoint, ByteBuffer buffer) {
- BasicStructureSerDeUtil.write(endPoint.getIp(), buffer);
- buffer.putInt(endPoint.getPort());
+ try {
+ endPoint.write(generateWriteProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Write TEndPoint failed: ", e);
+ }
}
public static TEndPoint readTEndPoint(ByteBuffer buffer) {
TEndPoint endPoint = new TEndPoint();
- endPoint.setIp(BasicStructureSerDeUtil.readString(buffer));
- endPoint.setPort(buffer.getInt());
+ try {
+ endPoint.read(generateReadProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Read TEndPoint failed: ", e);
+ }
return endPoint;
}
public static void writeTDataNodeLocation(TDataNodeLocation dataNodeLocation, ByteBuffer buffer) {
- buffer.putInt(dataNodeLocation.getDataNodeId());
-
- buffer.put(dataNodeLocation.isSetExternalEndPoint() ? (byte) 1 : (byte) 0);
- if (dataNodeLocation.isSetExternalEndPoint()) {
- writeTEndPoint(dataNodeLocation.getExternalEndPoint(), buffer);
- }
-
- buffer.put(dataNodeLocation.isSetInternalEndPoint() ? (byte) 1 : (byte) 0);
- if (dataNodeLocation.isSetInternalEndPoint()) {
- writeTEndPoint(dataNodeLocation.getInternalEndPoint(), buffer);
- }
-
- buffer.put(dataNodeLocation.isSetDataBlockManagerEndPoint() ? (byte) 1 : (byte) 0);
- if (dataNodeLocation.isSetDataBlockManagerEndPoint()) {
- writeTEndPoint(dataNodeLocation.getDataBlockManagerEndPoint(), buffer);
- }
-
- buffer.put(dataNodeLocation.isSetConsensusEndPoint() ? (byte) 1 : (byte) 0);
- if (dataNodeLocation.isSetConsensusEndPoint()) {
- writeTEndPoint(dataNodeLocation.getConsensusEndPoint(), buffer);
+ try {
+ dataNodeLocation.write(generateWriteProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Write TDataNodeLocation failed: ", e);
}
}
public static TDataNodeLocation readTDataNodeLocation(ByteBuffer buffer) {
TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
- dataNodeLocation.setDataNodeId(buffer.getInt());
-
- if (buffer.get() > 0) {
- dataNodeLocation.setExternalEndPoint(readTEndPoint(buffer));
- }
-
- if (buffer.get() > 0) {
- dataNodeLocation.setInternalEndPoint(readTEndPoint(buffer));
- }
-
- if (buffer.get() > 0) {
- dataNodeLocation.setDataBlockManagerEndPoint(readTEndPoint(buffer));
- }
-
- if (buffer.get() > 0) {
- dataNodeLocation.setConsensusEndPoint(readTEndPoint(buffer));
+ try {
+ dataNodeLocation.read(generateReadProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Read TDataNodeLocation failed: ", e);
}
return dataNodeLocation;
}
public static void writeTSeriesPartitionSlot(
TSeriesPartitionSlot seriesPartitionSlot, ByteBuffer buffer) {
- buffer.putInt(seriesPartitionSlot.getSlotId());
+ try {
+ seriesPartitionSlot.write(generateWriteProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Write TSeriesPartitionSlot failed: ", e);
+ }
}
public static TSeriesPartitionSlot readTSeriesPartitionSlot(ByteBuffer buffer) {
- return new TSeriesPartitionSlot(buffer.getInt());
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot();
+ try {
+ seriesPartitionSlot.read(generateReadProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Read TSeriesPartitionSlot failed: ", e);
+ }
+ return seriesPartitionSlot;
}
public static void writeTTimePartitionSlot(
TTimePartitionSlot timePartitionSlot, ByteBuffer buffer) {
- buffer.putLong(timePartitionSlot.getStartTime());
+ try {
+ timePartitionSlot.write(generateWriteProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Write TTimePartitionSlot failed: ", e);
+ }
}
public static TTimePartitionSlot readTTimePartitionSlot(ByteBuffer buffer) {
- return new TTimePartitionSlot(buffer.getLong());
+ TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
+ try {
+ timePartitionSlot.read(generateReadProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Read TTimePartitionSlot failed: ", e);
+ }
+ return timePartitionSlot;
}
public static void writeTConsensusGroupId(TConsensusGroupId consensusGroupId, ByteBuffer buffer) {
- buffer.putInt(consensusGroupId.getType().ordinal());
- buffer.putInt(consensusGroupId.getId());
+ try {
+ consensusGroupId.write(generateWriteProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Write TConsensusGroupId failed: ", e);
+ }
}
public static TConsensusGroupId readTConsensusGroupId(ByteBuffer buffer) {
- TConsensusGroupType type = TConsensusGroupType.values()[buffer.getInt()];
- return new TConsensusGroupId(type, buffer.getInt());
+ TConsensusGroupId consensusGroupId = new TConsensusGroupId();
+ try {
+ consensusGroupId.read(generateReadProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Read TConsensusGroupId failed: ", e);
+ }
+ return consensusGroupId;
}
public static void writeTRegionReplicaSet(TRegionReplicaSet regionReplicaSet, ByteBuffer buffer) {
- writeTConsensusGroupId(regionReplicaSet.getRegionId(), buffer);
- buffer.putInt(regionReplicaSet.getDataNodeLocationsSize());
- regionReplicaSet
- .getDataNodeLocations()
- .forEach(dataNodeLocation -> writeTDataNodeLocation(dataNodeLocation, buffer));
+ try {
+ regionReplicaSet.write(generateWriteProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Write TRegionReplicaSet failed: ", e);
+ }
}
public static TRegionReplicaSet readTRegionReplicaSet(ByteBuffer buffer) {
- TConsensusGroupId consensusGroupId = readTConsensusGroupId(buffer);
- int dataNodeLocationNum = buffer.getInt();
- List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
- for (int i = 0; i < dataNodeLocationNum; i++) {
- dataNodeLocations.add(readTDataNodeLocation(buffer));
+ TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
+ try {
+ regionReplicaSet.read(generateReadProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Read TRegionReplicaSet failed: ", e);
}
- return new TRegionReplicaSet(consensusGroupId, dataNodeLocations);
+ return regionReplicaSet;
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
index 85122f55ad..d8f0c58772 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
@@ -18,12 +18,17 @@
*/
package org.apache.iotdb.commons.utils;
+import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TByteBuffer;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-// TODO: Serialize and deserialize by thrift structure
/** Utils for serialize and deserialize all the data struct defined by thrift-confignode */
public class ThriftConfigNodeSerDeUtils {
@@ -31,57 +36,34 @@ public class ThriftConfigNodeSerDeUtils {
// Empty constructor
}
- public static void writeTStorageGroupSchema(
- TStorageGroupSchema storageGroupSchema, ByteBuffer buffer) {
- BasicStructureSerDeUtil.write(storageGroupSchema.getName(), buffer);
- buffer.putLong(storageGroupSchema.getTTL());
- buffer.putInt(storageGroupSchema.getSchemaReplicationFactor());
- buffer.putInt(storageGroupSchema.getDataReplicationFactor());
- buffer.putLong(storageGroupSchema.getTimePartitionInterval());
+ private static TBinaryProtocol generateWriteProtocol(ByteBuffer buffer)
+ throws TTransportException {
+ TTransport transport = new TByteBuffer(buffer);
+ return new TBinaryProtocol(transport);
+ }
- buffer.putInt(storageGroupSchema.getSchemaRegionGroupIdsSize());
- if (storageGroupSchema.getSchemaRegionGroupIdsSize() > 0) {
- storageGroupSchema
- .getSchemaRegionGroupIds()
- .forEach(
- schemaRegionGroupId ->
- ThriftCommonsSerDeUtils.writeTConsensusGroupId(schemaRegionGroupId, buffer));
- }
+ private static TBinaryProtocol generateReadProtocol(ByteBuffer buffer)
+ throws TTransportException {
+ TTransport transport = new TByteBuffer(buffer);
+ return new TBinaryProtocol(transport);
+ }
- buffer.putInt(storageGroupSchema.getDataRegionGroupIdsSize());
- if (storageGroupSchema.getDataRegionGroupIdsSize() > 0) {
- storageGroupSchema
- .getDataRegionGroupIds()
- .forEach(
- dataRegionGroupId ->
- ThriftCommonsSerDeUtils.writeTConsensusGroupId(dataRegionGroupId, buffer));
+ public static void writeTStorageGroupSchema(
+ TStorageGroupSchema storageGroupSchema, ByteBuffer buffer) {
+ try {
+ storageGroupSchema.write(generateWriteProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Write TStorageGroupSchema failed: ", e);
}
}
public static TStorageGroupSchema readTStorageGroupSchema(ByteBuffer buffer) {
TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
- storageGroupSchema.setName(BasicStructureSerDeUtil.readString(buffer));
- storageGroupSchema.setTTL(buffer.getLong());
- storageGroupSchema.setSchemaReplicationFactor(buffer.getInt());
- storageGroupSchema.setDataReplicationFactor(buffer.getInt());
- storageGroupSchema.setTimePartitionInterval(buffer.getLong());
-
- int groupIdNum = buffer.getInt();
- storageGroupSchema.setSchemaRegionGroupIds(new ArrayList<>());
- for (int i = 0; i < groupIdNum; i++) {
- storageGroupSchema
- .getSchemaRegionGroupIds()
- .add(ThriftCommonsSerDeUtils.readTConsensusGroupId(buffer));
+ try {
+ storageGroupSchema.read(generateReadProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Read TStorageGroupSchema failed: ", e);
}
-
- groupIdNum = buffer.getInt();
- storageGroupSchema.setDataRegionGroupIds(new ArrayList<>());
- for (int i = 0; i < groupIdNum; i++) {
- storageGroupSchema
- .getDataRegionGroupIds()
- .add(ThriftCommonsSerDeUtils.readTConsensusGroupId(buffer));
- }
-
return storageGroupSchema;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
index 8a5a61e82f..bbe4f15dd6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
@@ -119,7 +119,7 @@ public class GroupByLevelNodeSerdeTest {
new int[] {1, 3},
groupedPathMap);
- ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
groupByLevelNode.serialize(byteBuffer);
byteBuffer.flip();
assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), groupByLevelNode);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
index 3e1066dd86..0084cf9545 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
@@ -121,7 +121,7 @@ public class LimitNodeSerdeTest {
groupedPathMap);
LimitNode limitNode = new LimitNode(new PlanNodeId("TestLimitNode"), groupByLevelNode, 3);
- ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
limitNode.serialize(byteBuffer);
byteBuffer.flip();
assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), limitNode);
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index bf0dbad703..255b73f5a7 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -256,6 +256,10 @@ public class InternalServiceImplTest {
List<TDataNodeLocation> dataNodeList = new ArrayList<>();
dataNodeList.add(
new TDataNodeLocation()
+ .setExternalEndPoint(new TEndPoint(conf.getRpcAddress(), conf.getRpcPort()))
+ .setInternalEndPoint(new TEndPoint(conf.getInternalIp(), conf.getInternalPort()))
+ .setDataBlockManagerEndPoint(
+ new TEndPoint(conf.getInternalIp(), conf.getDataBlockManagerPort()))
.setConsensusEndPoint(new TEndPoint(conf.getInternalIp(), conf.getConsensusPort())));
// construct fragmentInstance