You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/01/11 13:24:02 UTC
[iotdb] branch WalDirectBuffer updated: use netty buffer
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch WalDirectBuffer
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/WalDirectBuffer by this push:
new 704b352 use netty buffer
704b352 is described below
commit 704b35270d172e0b27c93a8e3cd6fd5e7cda4bfa
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Mon Jan 11 21:23:29 2021 +0800
use netty buffer
---
server/pom.xml | 5 +
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 23 +++-
.../iotdb/db/qp/physical/crud/DeletePlan.java | 15 +++
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 70 ++++++++++++
.../physical/crud/InsertRowsOfOneDevicePlan.java | 15 +++
.../db/qp/physical/crud/InsertTabletPlan.java | 123 +++++++++++++++++++++
.../iotdb/db/qp/physical/sys/AuthorPlan.java | 28 +++++
.../iotdb/db/qp/physical/sys/ChangeAliasPlan.java | 18 ++-
.../db/qp/physical/sys/ChangeTagOffsetPlan.java | 18 ++-
.../iotdb/db/qp/physical/sys/CreateIndexPlan.java | 23 ++++
.../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 58 ++++++++++
.../db/qp/physical/sys/CreateTimeSeriesPlan.java | 47 ++++++++
.../iotdb/db/qp/physical/sys/DataAuthPlan.java | 14 +++
.../db/qp/physical/sys/DeleteStorageGroupPlan.java | 13 +++
.../db/qp/physical/sys/DeleteTimeSeriesPlan.java | 13 +++
.../iotdb/db/qp/physical/sys/DropIndexPlan.java | 15 +++
.../apache/iotdb/db/qp/physical/sys/FlushPlan.java | 40 +++++++
.../apache/iotdb/db/qp/physical/sys/MNodePlan.java | 16 ++-
.../db/qp/physical/sys/MeasurementMNodePlan.java | 20 +++-
.../db/qp/physical/sys/SetStorageGroupPlan.java | 8 ++
.../iotdb/db/qp/physical/sys/SetTTLPlan.java | 11 ++
.../db/qp/physical/sys/StorageGroupMNodePlan.java | 16 ++-
.../apache/iotdb/db/writelog/io/ILogWriter.java | 3 +
.../org/apache/iotdb/db/writelog/io/LogWriter.java | 63 +++++++----
.../db/writelog/node/ExclusiveWriteLogNode.java | 46 ++++----
.../apache/iotdb/db/writelog/WriteLogNodeTest.java | 2 +-
tsfile/pom.xml | 5 +
.../tsfile/file/metadata/enums/TSDataType.java | 5 +
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 92 +++++++++++++++
.../tsfile/write/schema/MeasurementSchema.java | 25 +++++
30 files changed, 782 insertions(+), 68 deletions(-)
diff --git a/server/pom.xml b/server/pom.xml
index ff38b75..81f058d 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -191,6 +191,11 @@
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ <version>4.1.22.Final</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index e9bf655..a2430fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -30,14 +31,14 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateIndexPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
@@ -139,6 +140,10 @@ public abstract class PhysicalPlan {
throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
}
+ public void serialize(ByteBuf buffer) {
+ throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
+ }
+
/**
* Deserialize the plan from the given buffer. This is provided for WAL, and must be used with
* serializeToWAL.
@@ -157,12 +162,26 @@ public abstract class PhysicalPlan {
}
}
+ protected void putString(ByteBuf buffer, String value) {
+ if (value == null) {
+ buffer.writeInt(NULL_VALUE_LEN);
+ } else {
+ ReadWriteIOUtils.write(value, buffer);
+ }
+ }
+
protected void putStrings(ByteBuffer buffer, List<String> values) {
for (String value : values) {
putString(buffer, value);
}
}
+ protected void putStrings(ByteBuf buffer, List<String> values) {
+ for (String value : values) {
+ putString(buffer, value);
+ }
+ }
+
protected void putString(DataOutputStream stream, String value) throws IOException {
if (value == null) {
stream.writeInt(NULL_VALUE_LEN);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
index 9b1fe6a..982325a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -148,6 +149,20 @@ public class DeletePlan extends PhysicalPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ int type = PhysicalPlanType.DELETE.ordinal();
+ buffer.writeByte((byte) type);
+ buffer.writeLong(deleteStartTime);
+ buffer.writeLong(deleteEndTime);
+ buffer.writeInt(paths.size());
+ for (PartialPath path : paths) {
+ putString(buffer, path.getFullPath());
+ }
+
+ buffer.writeLong(index);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
this.deleteStartTime = buffer.getLong();
this.deleteEndTime = buffer.getLong();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index ec32f5f..5b06d01 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -369,6 +370,44 @@ public class InsertRowPlan extends InsertPlan {
}
}
+ private void putValues(ByteBuf buffer) throws QueryProcessException {
+ for (int i = 0; i < values.length; i++) {
+ if (measurements[i] == null) {
+ continue;
+ }
+ // types are not determined, the situation mainly occurs when the plan uses string values
+ // and is forwarded to other nodes
+ if (dataTypes == null || dataTypes[i] == null) {
+ ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer);
+ ReadWriteIOUtils.write((String) values[i], buffer);
+ } else {
+ ReadWriteIOUtils.write(dataTypes[i], buffer);
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ ReadWriteIOUtils.write((Boolean) values[i], buffer);
+ break;
+ case INT32:
+ ReadWriteIOUtils.write((Integer) values[i], buffer);
+ break;
+ case INT64:
+ ReadWriteIOUtils.write((Long) values[i], buffer);
+ break;
+ case FLOAT:
+ ReadWriteIOUtils.write((Float) values[i], buffer);
+ break;
+ case DOUBLE:
+ ReadWriteIOUtils.write((Double) values[i], buffer);
+ break;
+ case TEXT:
+ ReadWriteIOUtils.write((Binary) values[i], buffer);
+ break;
+ default:
+ throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+ }
+ }
+ }
+ }
+
/**
* Make sure the values is already inited before calling this
*/
@@ -418,6 +457,16 @@ public class InsertRowPlan extends InsertPlan {
serializeMeasurementsAndValues(buffer);
}
+ @Override
+ public void serialize(ByteBuf buffer) {
+ int type = PhysicalPlanType.INSERT.ordinal();
+ buffer.writeByte((byte) type);
+ buffer.writeLong(time);
+
+ putString(buffer, deviceId.getFullPath());
+ serializeMeasurementsAndValues(buffer);
+ }
+
void serializeMeasurementsAndValues(ByteBuffer buffer) {
buffer
.putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
@@ -439,6 +488,27 @@ public class InsertRowPlan extends InsertPlan {
buffer.putLong(index);
}
+ void serializeMeasurementsAndValues(ByteBuf buffer) {
+ buffer
+ .writeInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
+
+ for (String measurement : measurements) {
+ if (measurement != null) {
+ putString(buffer, measurement);
+ }
+ }
+
+ try {
+ putValues(buffer);
+ } catch (QueryProcessException e) {
+ logger.error("Failed to serialize values for {}", this, e);
+ }
+
+ // the types are not inferred before the plan is serialized
+ buffer.writeByte((byte) (isNeedInferType ? 1 : 0));
+ buffer.writeLong(index);
+ }
+
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
this.time = buffer.getLong();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 66bf538..d15736d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -113,6 +114,20 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ int type = PhysicalPlanType.INSERT.ordinal();
+ buffer.writeByte((byte) type);
+
+ putString(buffer, deviceId.getFullPath());
+
+ buffer.writeInt(rowPlans.length);
+ for (InsertRowPlan plan : rowPlans) {
+ buffer.writeLong(plan.getTime());
+ plan.serializeMeasurementsAndValues(buffer);
+ }
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
this.deviceId = new PartialPath(readString(buffer));
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index b3572b5..29b74f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -217,6 +218,18 @@ public class InsertTabletPlan extends InsertPlan {
writeValues(buffer);
}
+ @Override
+ public void serialize(ByteBuf buffer) {
+ int type = PhysicalPlanType.BATCHINSERT.ordinal();
+ buffer.writeByte((byte) type);
+
+ putString(buffer, deviceId.getFullPath());
+ writeMeasurements(buffer);
+ writeDataTypes(buffer);
+ writeTimes(buffer);
+ writeValues(buffer);
+ }
+
private void writeMeasurements(ByteBuffer buffer) {
buffer
.putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
@@ -227,6 +240,16 @@ public class InsertTabletPlan extends InsertPlan {
}
}
+ private void writeMeasurements(ByteBuf buffer) {
+ buffer
+ .writeInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
+ for (String m : measurements) {
+ if (m != null) {
+ putString(buffer, m);
+ }
+ }
+ }
+
private void writeDataTypes(ByteBuffer buffer) {
for (int i = 0, dataTypesLength = dataTypes.length; i < dataTypesLength; i++) {
TSDataType dataType = dataTypes[i];
@@ -236,6 +259,15 @@ public class InsertTabletPlan extends InsertPlan {
}
}
+ private void writeDataTypes(ByteBuf buffer) {
+ for (int i = 0, dataTypesLength = dataTypes.length; i < dataTypesLength; i++) {
+ TSDataType dataType = dataTypes[i];
+ if (measurements[i] != null) {
+ dataType.serializeTo(buffer);
+ }
+ }
+ }
+
private void writeTimes(ByteBuffer buffer) {
if (isExecuting) {
buffer.putInt(end - start);
@@ -259,6 +291,29 @@ public class InsertTabletPlan extends InsertPlan {
}
}
+ private void writeTimes(ByteBuf buffer) {
+ if (isExecuting) {
+ buffer.writeInt(end - start);
+ } else {
+ buffer.writeInt(rowCount);
+ }
+
+ if (timeBuffer == null) {
+ if (isExecuting) {
+ for (int i = start; i < end; i++) {
+ buffer.writeLong(times[i]);
+ }
+ } else {
+ for (long time : times) {
+ buffer.writeLong(time);
+ }
+ }
+ } else {
+ buffer.writeBytes(timeBuffer.array());
+ timeBuffer = null;
+ }
+ }
+
private void writeValues(ByteBuffer buffer) {
if (valueBuffer == null) {
serializeValues(buffer);
@@ -270,6 +325,17 @@ public class InsertTabletPlan extends InsertPlan {
buffer.putLong(index);
}
+ private void writeValues(ByteBuf buffer) {
+ if (valueBuffer == null) {
+ serializeValues(buffer);
+ } else {
+ buffer.writeBytes(valueBuffer.array());
+ valueBuffer = null;
+ }
+
+ buffer.writeLong(index);
+ }
+
private void serializeValues(DataOutputStream outputStream) throws IOException {
for (int i = 0; i < measurements.length; i++) {
if (measurements[i] == null) {
@@ -288,6 +354,15 @@ public class InsertTabletPlan extends InsertPlan {
}
}
+ private void serializeValues(ByteBuf buffer) {
+ for (int i = 0; i < measurements.length; i++) {
+ if (measurements[i] == null) {
+ continue;
+ }
+ serializeColumn(dataTypes[i], columns[i], buffer, start, end);
+ }
+ }
+
private void serializeColumn(TSDataType dataType, Object column, ByteBuffer buffer,
int start, int end) {
int curStart = isExecuting ? start : 0;
@@ -336,6 +411,54 @@ public class InsertTabletPlan extends InsertPlan {
}
}
+ private void serializeColumn(TSDataType dataType, Object column, ByteBuf buffer,
+ int start, int end) {
+ int curStart = isExecuting ? start : 0;
+ int curEnd = isExecuting ? end : rowCount;
+ switch (dataType) {
+ case INT32:
+ int[] intValues = (int[]) column;
+ for (int j = curStart; j < curEnd; j++) {
+ buffer.writeInt(intValues[j]);
+ }
+ break;
+ case INT64:
+ long[] longValues = (long[]) column;
+ for (int j = curStart; j < curEnd; j++) {
+ buffer.writeLong(longValues[j]);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = (float[]) column;
+ for (int j = curStart; j < curEnd; j++) {
+ buffer.writeFloat(floatValues[j]);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = (double[]) column;
+ for (int j = curStart; j < curEnd; j++) {
+ buffer.writeDouble(doubleValues[j]);
+ }
+ break;
+ case BOOLEAN:
+ boolean[] boolValues = (boolean[]) column;
+ for (int j = curStart; j < curEnd; j++) {
+ buffer.writeByte(BytesUtils.boolToByte(boolValues[j]));
+ }
+ break;
+ case TEXT:
+ Binary[] binaryValues = (Binary[]) column;
+ for (int j = curStart; j < curEnd; j++) {
+ buffer.writeInt(binaryValues[j].getLength());
+ buffer.writeBytes(binaryValues[j].getValues());
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(DATATYPE_UNSUPPORTED, dataType));
+ }
+ }
+
private void serializeColumn(TSDataType dataType, Object column, DataOutputStream outputStream,
int start, int end) throws IOException {
int curStart = isExecuting ? start : 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
index bbf1d78..38bca24 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.sys;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -333,6 +334,33 @@ public class AuthorPlan extends PhysicalPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ int type = this.getPlanType(super.getOperatorType());
+ buffer.writeByte((byte) type);
+ buffer.writeInt(authorType.ordinal());
+ putString(buffer, userName);
+ putString(buffer, roleName);
+ putString(buffer, password);
+ putString(buffer, newPassword);
+ if (permissions == null) {
+ buffer.writeByte((byte) 0);
+ } else {
+ buffer.writeInt((byte) 1);
+ buffer.writeInt(permissions.size());
+ for (int permission : permissions) {
+ buffer.writeInt(permission);
+ }
+ }
+ if (nodeName == null) {
+ putString(buffer, null);
+ } else {
+ putString(buffer, nodeName.getFullPath());
+ }
+
+ buffer.writeLong(index);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
this.authorType = AuthorType.values()[buffer.getInt()];
this.userName = readString(buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeAliasPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeAliasPlan.java
index a6bf1aa..6d84982 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeAliasPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeAliasPlan.java
@@ -19,17 +19,17 @@
package org.apache.iotdb.db.qp.physical.sys;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
public class ChangeAliasPlan extends PhysicalPlan {
private PartialPath path;
@@ -79,6 +79,14 @@ public class ChangeAliasPlan extends PhysicalPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ int type = PhysicalPlanType.CHANGE_ALIAS.ordinal();
+ buffer.writeByte((byte) type);
+ putString(buffer, path.getFullPath());
+ putString(buffer, alias);
+ }
+
+ @Override
public void serialize(DataOutputStream stream) throws IOException {
stream.write((byte) PhysicalPlanType.CHANGE_ALIAS.ordinal());
putString(stream, path.getFullPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeTagOffsetPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeTagOffsetPlan.java
index ba80502..2c4361f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeTagOffsetPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ChangeTagOffsetPlan.java
@@ -19,17 +19,17 @@
package org.apache.iotdb.db.qp.physical.sys;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
public class ChangeTagOffsetPlan extends PhysicalPlan {
private PartialPath path;
@@ -79,6 +79,14 @@ public class ChangeTagOffsetPlan extends PhysicalPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ int type = PhysicalPlanType.CHANGE_TAG_OFFSET.ordinal();
+ buffer.writeByte((byte) type);
+ putString(buffer, path.getFullPath());
+ buffer.writeLong(offset);
+ }
+
+ @Override
public void serialize(DataOutputStream stream) throws IOException {
stream.write((byte) PhysicalPlanType.CHANGE_TAG_OFFSET.ordinal());
putString(stream, path.getFullPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateIndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateIndexPlan.java
index f614e42..691818d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateIndexPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateIndexPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.sys;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -134,6 +135,28 @@ public class CreateIndexPlan extends PhysicalPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ int type = PhysicalPlanType.CREATE_INDEX.ordinal();
+ buffer.writeByte((byte) type);
+ buffer.writeByte((byte) indexType.serialize());
+ buffer.writeLong(time);
+ buffer.writeInt(paths.size());
+ for (PartialPath path : paths) {
+ putString(buffer, path.getFullPath());
+ }
+
+ // props
+ if (props != null && !props.isEmpty()) {
+ buffer.writeByte((byte) 1);
+ ReadWriteIOUtils.write(props, buffer);
+ } else {
+ buffer.writeByte((byte) 0);
+ }
+
+ buffer.writeLong(index);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
indexType = IndexType.deserialize(buffer.get());
time = buffer.getLong();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
index e381998..ba06ff4 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.sys;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -227,6 +228,33 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
buffer.putLong(index);
}
+ @Override
+ public void serialize(ByteBuf buffer) {
+ int type = PhysicalPlanType.CREATE_MULTI_TIMESERIES.ordinal();
+ buffer.writeByte((byte) type);
+ buffer.writeInt(paths.size());
+
+ for (PartialPath path : paths) {
+ putString(buffer, path.getFullPath());
+ }
+
+ for (TSDataType dataType : dataTypes) {
+ buffer.writeByte((byte) dataType.ordinal());
+ }
+
+ for (TSEncoding encoding : encodings) {
+ buffer.writeByte((byte) encoding.ordinal());
+ }
+
+ for (CompressionType compressor : compressors) {
+ buffer.writeByte((byte) compressor.ordinal());
+ }
+
+ serializeOptional(buffer);
+
+ buffer.writeLong(index);
+ }
+
private void serializeOptional(ByteBuffer buffer) {
if (alias != null) {
buffer.put((byte) 1);
@@ -257,6 +285,36 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
}
}
+ private void serializeOptional(ByteBuf buffer) {
+ if (alias != null) {
+ buffer.writeByte((byte) 1);
+ putStrings(buffer, alias);
+ } else {
+ buffer.writeByte((byte) 0);
+ }
+
+ if (props != null) {
+ buffer.writeByte((byte) 1);
+ ReadWriteIOUtils.write(props, buffer);
+ } else {
+ buffer.writeByte((byte) 0);
+ }
+
+ if (tags != null) {
+ buffer.writeByte((byte) 1);
+ ReadWriteIOUtils.write(tags, buffer);
+ } else {
+ buffer.writeByte((byte) 0);
+ }
+
+ if (attributes != null) {
+ buffer.writeByte((byte) 1);
+ ReadWriteIOUtils.write(attributes, buffer);
+ } else {
+ buffer.writeByte((byte) 0);
+ }
+ }
+
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
int totalSize = buffer.getInt();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateTimeSeriesPlan.java
index 7716af0..6fa38fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateTimeSeriesPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.sys;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -246,6 +247,52 @@ public class CreateTimeSeriesPlan extends PhysicalPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ buffer.writeByte((byte) PhysicalPlanType.CREATE_TIMESERIES.ordinal());
+ byte[] bytes = path.getFullPath().getBytes();
+ buffer.writeInt(bytes.length);
+ buffer.writeBytes(bytes);
+ buffer.writeByte((byte) dataType.ordinal());
+ buffer.writeByte((byte) encoding.ordinal());
+ buffer.writeByte((byte) compressor.ordinal());
+ buffer.writeLong(tagOffset);
+
+ // alias
+ if (alias != null) {
+ buffer.writeByte((byte) 1);
+ ReadWriteIOUtils.write(alias, buffer);
+ } else {
+ buffer.writeByte((byte) 0);
+ }
+
+ // props
+ if (props != null && !props.isEmpty()) {
+ buffer.writeByte((byte) 1);
+ ReadWriteIOUtils.write(props, buffer);
+ } else {
+ buffer.writeByte((byte) 0);
+ }
+
+ // tags
+ if (tags != null && !tags.isEmpty()) {
+ buffer.writeByte((byte) 1);
+ ReadWriteIOUtils.write(tags, buffer);
+ } else {
+ buffer.writeByte((byte) 0);
+ }
+
+ // attributes
+ if (attributes != null && !attributes.isEmpty()) {
+ buffer.writeByte((byte) 1);
+ ReadWriteIOUtils.write(attributes, buffer);
+ } else {
+ buffer.writeByte((byte) 0);
+ }
+
+ buffer.writeLong(index);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
int length = buffer.getInt();
byte[] bytes = new byte[length];
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DataAuthPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DataAuthPlan.java
index b31e795..bd746ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DataAuthPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DataAuthPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.sys;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -77,6 +78,19 @@ public class DataAuthPlan extends PhysicalPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ int type = this.getPlanType(super.getOperatorType());
+ buffer.writeByte((byte) type);
+ buffer.writeInt(users.size());
+
+ for (String user : users) {
+ putString(buffer, user);
+ }
+
+ buffer.writeLong(index);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) {
int userSize = buffer.getInt();
this.users = new ArrayList<>(userSize);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteStorageGroupPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteStorageGroupPlan.java
index 5f8a13f..6e9cb9d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteStorageGroupPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteStorageGroupPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.sys;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -71,6 +72,18 @@ public class DeleteStorageGroupPlan extends PhysicalPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ int type = PhysicalPlanType.DELETE_STORAGE_GROUP.ordinal();
+ buffer.writeByte((byte) type);
+ buffer.writeInt(this.getPaths().size());
+ for (PartialPath path : this.getPaths()) {
+ putString(buffer, path.getFullPath());
+ }
+
+ buffer.writeLong(index);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
int pathNum = buffer.getInt();
this.deletePathList = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
index 24373be..cd0c3a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.sys;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -75,6 +76,18 @@ public class DeleteTimeSeriesPlan extends PhysicalPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ int type = PhysicalPlanType.DELETE_TIMESERIES.ordinal();
+ buffer.writeByte((byte) type);
+ buffer.writeInt(deletePathList.size());
+ for (PartialPath path : deletePathList) {
+ putString(buffer, path.getFullPath());
+ }
+
+ buffer.writeLong(index);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
int pathNumber = buffer.getInt();
deletePathList = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropIndexPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropIndexPlan.java
index 41b4ad8..b69da72 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropIndexPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropIndexPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.sys;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -94,6 +95,20 @@ public class DropIndexPlan extends PhysicalPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ int type = PhysicalPlanType.DROP_INDEX.ordinal();
+ buffer.writeByte((byte) type);
+ buffer.writeByte((byte) indexType.serialize());
+
+ buffer.writeInt(paths.size());
+ for (PartialPath path : paths) {
+ putString(buffer, path.getFullPath());
+ }
+
+ buffer.writeLong(index);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
indexType = IndexType.deserialize(buffer.get());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java
index c6c0ef3..6557d12 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.sys;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -158,6 +159,19 @@ public class FlushPlan extends PhysicalPlan {
writeStorageGroupPartitionIds(buffer);
}
+ @Override
+ public void serialize(ByteBuf buffer) {
+ int type = PhysicalPlanType.FLUSH.ordinal();
+ buffer.writeByte((byte) type);
+ if (isSeq == null) {
+ buffer.writeByte((byte) 2);
+ } else {
+ buffer.writeByte((byte) (Boolean.TRUE.equals(isSeq) ? 1 : 0));
+ }
+ buffer.writeByte((byte) (isSync ? 1 : 0));
+ writeStorageGroupPartitionIds(buffer);
+ }
+
public void writeStorageGroupPartitionIds(ByteBuffer buffer) {
if (storageGroupPartitionIds == null) {
// null value
@@ -184,6 +198,32 @@ public class FlushPlan extends PhysicalPlan {
}
}
+ public void writeStorageGroupPartitionIds(ByteBuf buffer) {
+ if (storageGroupPartitionIds == null) {
+ // null value
+ buffer.writeByte((byte) 0);
+ } else {
+ // null value
+ buffer.writeByte((byte) 1);
+ buffer.writeInt(storageGroupPartitionIds.size());
+ for (Entry<PartialPath, List<Pair<Long, Boolean>>> entry : storageGroupPartitionIds
+ .entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey().getFullPath(), buffer);
+ if (entry.getValue() == null) {
+ // null value
+ buffer.writeByte((byte) 0);
+ } else {
+ buffer.writeByte((byte) 1);
+ ReadWriteIOUtils.write(entry.getValue().size(), buffer);
+ for (Pair<Long, Boolean> pair : entry.getValue()) {
+ ReadWriteIOUtils.write(pair.left, buffer);
+ ReadWriteIOUtils.write(pair.right, buffer);
+ }
+ }
+ }
+ }
+ }
+
@Override
public void deserialize(ByteBuffer buffer) {
byte isSeqFlag = buffer.get();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MNodePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MNodePlan.java
index e5c5140..0237492 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MNodePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MNodePlan.java
@@ -19,16 +19,16 @@
package org.apache.iotdb.db.qp.physical.sys;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
public class MNodePlan extends PhysicalPlan {
protected String name;
@@ -78,6 +78,14 @@ public class MNodePlan extends PhysicalPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ buffer.writeByte((byte) PhysicalPlanType.MNODE.ordinal());
+ putString(buffer, name);
+ buffer.writeInt(childSize);
+ buffer.writeLong(index);
+ }
+
+ @Override
public void serialize(DataOutputStream stream) throws IOException {
stream.write((byte) PhysicalPlanType.MNODE.ordinal());
putString(stream, name);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MeasurementMNodePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MeasurementMNodePlan.java
index c69f53f..713a4d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MeasurementMNodePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MeasurementMNodePlan.java
@@ -18,16 +18,17 @@
*/
package org.apache.iotdb.db.qp.physical.sys;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
public class MeasurementMNodePlan extends MNodePlan {
private MeasurementSchema schema;
@@ -67,6 +68,19 @@ public class MeasurementMNodePlan extends MNodePlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ buffer.writeByte((byte) PhysicalPlanType.MEASUREMENT_MNODE.ordinal());
+
+ putString(buffer, name);
+ putString(buffer, alias);
+ buffer.writeLong(offset);
+ buffer.writeInt(childSize);
+ schema.serializeTo(buffer);
+
+ buffer.writeLong(index);
+ }
+
+ @Override
public void serialize(DataOutputStream stream) throws IOException {
stream.write((byte) PhysicalPlanType.MEASUREMENT_MNODE.ordinal());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetStorageGroupPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetStorageGroupPlan.java
index a7a6e18..1f17489 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetStorageGroupPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetStorageGroupPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.sys;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -70,6 +71,13 @@ public class SetStorageGroupPlan extends PhysicalPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ buffer.writeByte((byte) PhysicalPlanType.SET_STORAGE_GROUP.ordinal());
+ putString(buffer, path.getFullPath());
+ buffer.writeLong(index);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
path = new PartialPath(readString(buffer));
this.index = buffer.getLong();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTTLPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTTLPlan.java
index deea2db..8001846 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTTLPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetTTLPlan.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.qp.physical.sys;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -76,6 +77,16 @@ public class SetTTLPlan extends PhysicalPlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ int type = PhysicalPlanType.TTL.ordinal();
+ buffer.writeByte((byte) type);
+ buffer.writeLong(dataTTL);
+ putString(buffer, storageGroup.getFullPath());
+
+ buffer.writeLong(index);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
this.dataTTL = buffer.getLong();
this.storageGroup = new PartialPath(readString(buffer));
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StorageGroupMNodePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StorageGroupMNodePlan.java
index 451475b..f71d8bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StorageGroupMNodePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StorageGroupMNodePlan.java
@@ -19,15 +19,15 @@
package org.apache.iotdb.db.qp.physical.sys;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
public class StorageGroupMNodePlan extends MNodePlan {
private long dataTTL;
@@ -67,6 +67,16 @@ public class StorageGroupMNodePlan extends MNodePlan {
}
@Override
+ public void serialize(ByteBuf buffer) {
+ buffer.writeByte((byte) PhysicalPlanType.STORAGE_GROUP_MNODE.ordinal());
+ putString(buffer, name);
+ buffer.writeLong(dataTTL);
+ buffer.writeInt(childSize);
+
+ buffer.writeLong(index);
+ }
+
+ @Override
public void serialize(DataOutputStream stream) throws IOException {
stream.write((byte) PhysicalPlanType.STORAGE_GROUP_MNODE.ordinal());
putString(stream, name);
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java b/server/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java
index 07ac023..02b3663 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.writelog.io;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -38,6 +39,8 @@ public interface ILogWriter {
*/
void write(ByteBuffer logBuffer) throws IOException;
+ void write(ByteBuf logBuffer) throws IOException;
+
/**
* force the OS/FileSystem to flush its cache to make sure logs are persisted.
* @throws IOException
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java b/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
index 1b21dfe..3a58835 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.writelog.io;
+import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
@@ -30,7 +31,6 @@ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.utils.TestOnly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import sun.nio.ch.DirectBuffer;
/**
* LogWriter writes the binary logs into a file using FileChannel together with check sums of
@@ -43,8 +43,8 @@ public class LogWriter implements ILogWriter {
private FileOutputStream fileOutputStream;
private FileChannel channel;
private final CRC32 checkSummer = new CRC32();
- private final ByteBuffer lengthBuffer = ByteBuffer.allocateDirect(4);
- private final ByteBuffer checkSumBuffer = ByteBuffer.allocateDirect(8);
+ private final ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+ private final ByteBuffer checkSumBuffer = ByteBuffer.allocate(8);
private final boolean forceEachWrite;
/**
@@ -106,6 +106,39 @@ public class LogWriter implements ILogWriter {
}
@Override
+ public void write(ByteBuf logBuffer) throws IOException {
+ if (channel == null) {
+ fileOutputStream = new FileOutputStream(logFile, true);
+ channel = fileOutputStream.getChannel();
+ }
+ int logSize = logBuffer.readableBytes();
+ // 4 bytes size and 8 bytes check sum
+
+ checkSummer.reset();
+ checkSummer.update(logBuffer.nioBuffer());
+ long checkSum = checkSummer.getValue();
+
+ lengthBuffer.clear();
+ checkSumBuffer.clear();
+ lengthBuffer.putInt(logSize);
+ checkSumBuffer.putLong(checkSum);
+ lengthBuffer.flip();
+ checkSumBuffer.flip();
+
+ try {
+ channel.write(lengthBuffer);
+ logBuffer.getBytes(0, channel, channel.position(), logBuffer.readableBytes());
+ channel.write(checkSumBuffer);
+
+ if (this.forceEachWrite) {
+ channel.force(true);
+ }
+ } catch (ClosedChannelException ignored) {
+ logger.warn("someone interrupt current thread, so no need to do write for io safety");
+ }
+ }
+
+ @Override
public void force() throws IOException {
if (channel != null && channel.isOpen()) {
channel.force(true);
@@ -114,25 +147,15 @@ public class LogWriter implements ILogWriter {
@Override
public void close() throws IOException {
- try {
- if (channel != null) {
- if (channel.isOpen()) {
- channel.force(true);
- }
- fileOutputStream.close();
- fileOutputStream = null;
- channel.close();
- channel = null;
- }
- } finally {
- if (lengthBuffer.isDirect()) {
- ((DirectBuffer) lengthBuffer).cleaner().clean();
- }
- if (checkSumBuffer.isDirect()) {
- ((DirectBuffer) checkSumBuffer).cleaner().clean();
+ if (channel != null) {
+ if (channel.isOpen()) {
+ channel.force(true);
}
+ fileOutputStream.close();
+ fileOutputStream = null;
+ channel.close();
+ channel = null;
}
-
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index de41479..64c9194 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.db.writelog.node;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.Comparator;
@@ -42,7 +42,6 @@ import org.apache.iotdb.db.writelog.io.LogWriter;
import org.apache.iotdb.db.writelog.io.MultiFileLogReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import sun.nio.ch.DirectBuffer;
/**
* This WriteLogNode is used to manage insert ahead logs of a TsFile.
@@ -51,6 +50,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
public static final String WAL_FILE_NAME = "wal";
private static final Logger logger = LoggerFactory.getLogger(ExclusiveWriteLogNode.class);
+ private static final PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
private String identifier;
@@ -60,11 +60,12 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private ByteBuffer logBufferWorking = ByteBuffer
- .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
- private ByteBuffer logBufferIdle = ByteBuffer
- .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
- private ByteBuffer logBufferFlushing;
+ private final int WAL_BUFFER_THRESHOLD =
+ IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2;
+
+ private ByteBuf logBufferWorking = allocator.directBuffer(WAL_BUFFER_THRESHOLD);
+ private ByteBuf logBufferIdle = allocator.directBuffer(WAL_BUFFER_THRESHOLD);
+ private ByteBuf logBufferFlushing;
private final Object switchBufferCondition = new Object();
private ReentrantLock lock = new ReentrantLock();
@@ -104,23 +105,16 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
if (bufferedLogNum >= config.getFlushWalThreshold()) {
sync();
}
- } catch (BufferOverflowException e) {
- throw new IOException(
- "Log cannot fit into the buffer, please increase wal_buffer_size", e);
} finally {
lock.unlock();
}
}
private void putLog(PhysicalPlan plan) {
- logBufferWorking.mark();
- try {
- plan.serialize(logBufferWorking);
- } catch (BufferOverflowException e) {
- logger.info("WAL BufferOverflow !");
- logBufferWorking.reset();
+ logBufferWorking.markWriterIndex();
+ plan.serialize(logBufferWorking);
+ if (logBufferWorking.readableBytes() >= WAL_BUFFER_THRESHOLD) {
sync();
- plan.serialize(logBufferWorking);
}
bufferedLogNum++;
}
@@ -150,13 +144,6 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
Thread.currentThread().interrupt();
logger.warn("Waiting for current buffer being flushed interrupted");
} finally {
- logBufferFlushing = null;
- if (logBufferWorking.isDirect()) {
- ((DirectBuffer) logBufferWorking).cleaner().clean();
- }
- if (logBufferIdle.isDirect()) {
- ((DirectBuffer) logBufferIdle).cleaner().clean();
- }
lock.unlock();
}
}
@@ -212,6 +199,15 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(logDirectory));
deleted = true;
} finally {
+ if (logBufferFlushing != null && logBufferFlushing.refCnt() != 0) {
+ logBufferFlushing.release();
+ }
+ if (logBufferIdle != null && logBufferIdle.refCnt() != 0) {
+ logBufferIdle.release();
+ }
+ if (logBufferWorking != null && logBufferWorking.refCnt() != 0) {
+ logBufferWorking.release();
+ }
lock.unlock();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index fe5610c..4566ae9 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -239,7 +239,7 @@ public class WriteLogNodeTest {
} catch (IOException e) {
caught = true;
}
- assertTrue(caught);
+ assertFalse(caught);
logNode.delete();
}
diff --git a/tsfile/pom.xml b/tsfile/pom.xml
index e05e38a..3418bbb 100644
--- a/tsfile/pom.xml
+++ b/tsfile/pom.xml
@@ -58,6 +58,11 @@
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ <version>4.1.22.Final</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
index 505be1b..ce99ff8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.file.metadata.enums;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -86,6 +87,10 @@ public enum TSDataType {
byteBuffer.putShort(serialize());
}
+ public void serializeTo(ByteBuf byteBuffer) {
+ byteBuffer.writeShort(serialize());
+ }
+
public void serializeTo(DataOutputStream outputStream) throws IOException {
outputStream.writeShort(serialize());
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index d4b7570..ce6ba0e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -28,6 +28,7 @@ import static org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.LO
import static org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.NULL;
import static org.apache.iotdb.tsfile.utils.ReadWriteIOUtils.ClassSerializeId.STRING;
+import io.netty.buffer.ByteBuf;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -157,12 +158,38 @@ public class ReadWriteIOUtils {
return length;
}
+ public static int write(Map<String, String> map, ByteBuf buffer) {
+ int length = 0;
+ byte[] bytes;
+ buffer.writeInt(map.size());
+ length += 4;
+ for (Entry<String, String> entry : map.entrySet()) {
+ bytes = entry.getKey().getBytes();
+ buffer.writeInt(bytes.length);
+ length += 4;
+ buffer.writeBytes(bytes);
+ length += bytes.length;
+ bytes = entry.getValue().getBytes();
+ buffer.writeInt(bytes.length);
+ length += 4;
+ buffer.writeBytes(bytes);
+ length += bytes.length;
+ }
+ return length;
+ }
+
public static void write(List<Map<String, String>> maps, ByteBuffer buffer) {
for (Map<String, String> map : maps) {
write(map, buffer);
}
}
+ public static void write(List<Map<String, String>> maps, ByteBuf buffer) {
+ for (Map<String, String> map : maps) {
+ write(map, buffer);
+ }
+ }
+
/**
* write a int value to outputStream according to flag. If flag is true, write 1, else write 0.
*/
@@ -190,6 +217,18 @@ public class ReadWriteIOUtils {
return 1;
}
+ public static int write(Boolean flag, ByteBuf buffer) {
+ byte a;
+ if (Boolean.TRUE.equals(flag)) {
+ a = 1;
+ } else {
+ a = 0;
+ }
+
+ buffer.writeByte(a);
+ return 1;
+ }
+
/**
* write a byte n.
*
@@ -223,6 +262,11 @@ public class ReadWriteIOUtils {
return Byte.BYTES;
}
+ public static int write(byte n, ByteBuf buffer) {
+ buffer.writeByte(n);
+ return Byte.BYTES;
+ }
+
/**
* write a short n to byteBuffer.
*
@@ -233,6 +277,11 @@ public class ReadWriteIOUtils {
return SHORT_LEN;
}
+ public static int write(short n, ByteBuf buffer) {
+ buffer.writeShort(n);
+ return SHORT_LEN;
+ }
+
/**
* write a short n to byteBuffer.
*
@@ -244,6 +293,12 @@ public class ReadWriteIOUtils {
return INT_LEN + n.getLength();
}
+ public static int write(Binary n, ByteBuf buffer) {
+ buffer.writeInt(n.getLength());
+ buffer.writeBytes(n.getValues());
+ return INT_LEN + n.getLength();
+ }
+
/**
* write a int n to outputStream.
*
@@ -275,6 +330,11 @@ public class ReadWriteIOUtils {
return INT_LEN;
}
+ public static int write(int n, ByteBuf buffer) {
+ buffer.writeInt(n);
+ return INT_LEN;
+ }
+
/**
* write a float n to outputStream.
*
@@ -316,6 +376,11 @@ public class ReadWriteIOUtils {
return LONG_LEN;
}
+ public static int write(long n, ByteBuf buffer) {
+ buffer.writeLong(n);
+ return LONG_LEN;
+ }
+
/**
* write a float n to byteBuffer.
*/
@@ -324,6 +389,11 @@ public class ReadWriteIOUtils {
return FLOAT_LEN;
}
+ public static int write(float n, ByteBuf buffer) {
+ buffer.writeFloat(n);
+ return FLOAT_LEN;
+ }
+
/**
* write a double n to byteBuffer.
*/
@@ -332,6 +402,11 @@ public class ReadWriteIOUtils {
return DOUBLE_LEN;
}
+ public static int write(double n, ByteBuf buffer) {
+ buffer.writeDouble(n);
+ return DOUBLE_LEN;
+ }
+
/**
* write string to outputStream.
@@ -369,6 +444,18 @@ public class ReadWriteIOUtils {
return len;
}
+ public static int write(String s, ByteBuf buffer) {
+ if (s == null) {
+ return write(-1, buffer);
+ }
+ int len = 0;
+ byte[] bytes = s.getBytes();
+ len += write(bytes.length, buffer);
+ buffer.writeBytes(bytes);
+ len += bytes.length;
+ return len;
+ }
+
/**
* write byteBuffer.capacity and byteBuffer.array to outputStream.
*/
@@ -429,6 +516,11 @@ public class ReadWriteIOUtils {
return write(n, buffer);
}
+ public static int write(TSDataType dataType, ByteBuf buffer) {
+ short n = dataType.serialize();
+ return write(n, buffer);
+ }
+
/**
* TSEncoding.
*/
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
index 11030c8..78c288f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.write.schema;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -269,6 +270,30 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
return byteLen;
}
+ public int serializeTo(ByteBuf buffer) {
+ int byteLen = 0;
+
+ byteLen += ReadWriteIOUtils.write(measurementId, buffer);
+
+ byteLen += ReadWriteIOUtils.write((short) type, buffer);
+
+ byteLen += ReadWriteIOUtils.write((short) encoding, buffer);
+
+ byteLen += ReadWriteIOUtils.write((short) compressor, buffer);
+
+ if (props == null) {
+ byteLen += ReadWriteIOUtils.write(0, buffer);
+ } else {
+ byteLen += ReadWriteIOUtils.write(props.size(), buffer);
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ byteLen += ReadWriteIOUtils.write(entry.getKey(), buffer);
+ byteLen += ReadWriteIOUtils.write(entry.getValue(), buffer);
+ }
+ }
+
+ return byteLen;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {