You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/26 14:49:01 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-2983] Serialization error in Partial insert (#5673)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new ee6ae34f8b [To rel/0.13][IOTDB-2983] Serialization error in Partial insert (#5673)
ee6ae34f8b is described below
commit ee6ae34f8bedf577be69a334003a25c09fb19240
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Tue Apr 26 22:48:54 2022 +0800
[To rel/0.13][IOTDB-2983] Serialization error in Partial insert (#5673)
---
.../db/integration/IoTDBPartialInsertionIT.java | 62 ++++++++++++
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 12 +--
.../db/qp/physical/crud/InsertTabletPlan.java | 112 ++++++++-------------
3 files changed, 106 insertions(+), 80 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBPartialInsertionIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBPartialInsertionIT.java
index 11fae494e9..b8d8789edf 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBPartialInsertionIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBPartialInsertionIT.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.integration;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.integration.env.ConfigFactory;
import org.apache.iotdb.integration.env.EnvFactory;
import org.apache.iotdb.itbase.category.ClusterTest;
@@ -25,19 +27,26 @@ import org.apache.iotdb.itbase.category.LocalStandaloneTest;
import org.apache.iotdb.jdbc.IoTDBSQLException;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@Category({LocalStandaloneTest.class, ClusterTest.class})
public class IoTDBPartialInsertionIT {
+ private final Logger logger = LoggerFactory.getLogger(IoTDBPartialInsertionIT.class);
@Before
public void setUp() throws Exception {
@@ -66,4 +75,57 @@ public class IoTDBPartialInsertionIT {
}
}
}
+
+ @Test
+ public void testPartialInsertionRestart() throws SQLException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ statement.execute("SET STORAGE GROUP TO root.sg");
+ statement.execute("CREATE TIMESERIES root.sg.d1.s1 datatype=text");
+ statement.execute("CREATE TIMESERIES root.sg.d1.s2 datatype=double");
+
+ try {
+ statement.execute("INSERT INTO root.sg.d1(time,s1,s2) VALUES(100,'test','test')");
+ } catch (IoTDBSQLException e) {
+ // ignore
+ }
+ }
+
+ long time = 0;
+ try {
+ EnvironmentUtils.restartDaemon();
+ StorageEngine.getInstance().recover();
+ // wait for recover
+ while (!StorageEngine.getInstance().isAllSgReady()) {
+ Thread.sleep(500);
+ time += 500;
+ if (time > 10000) {
+ logger.warn("wait too long in restart, wait for: " + time / 1000 + "s");
+ }
+ }
+ } catch (Exception e) {
+ Assert.fail();
+ }
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("SELECT s1 FROM root.sg.d1");
+ assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ cnt++;
+ assertEquals("test", resultSet.getString("root.sg.d1.s1"));
+ }
+ assertEquals(1, cnt);
+ }
+ hasResultSet = statement.execute("SELECT s2 FROM root.sg.d1");
+ assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ assertFalse(resultSet.next());
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index ee263cca21..85693d2879 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -342,8 +342,7 @@ public class InsertRowPlan extends InsertPlan {
}
void serializeMeasurementsAndValues(DataOutputStream stream) throws IOException {
- stream.writeInt(
- measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
+ stream.writeInt(measurements.length - getFailedMeasurementNumber());
for (String m : measurements) {
if (m != null) {
@@ -352,7 +351,7 @@ public class InsertRowPlan extends InsertPlan {
}
try {
- stream.writeInt(dataTypes.length);
+ stream.writeInt(values.length - getFailedMeasurementNumber());
putValues(stream);
} catch (QueryProcessException e) {
throw new IOException(e);
@@ -368,7 +367,6 @@ public class InsertRowPlan extends InsertPlan {
private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException {
for (int i = 0; i < values.length; i++) {
if (values[i] == null) {
- ReadWriteIOUtils.write(TYPE_NULL, outputStream);
continue;
}
// types are not determined, the situation mainly occurs when the plan uses string values
@@ -407,7 +405,6 @@ public class InsertRowPlan extends InsertPlan {
private void putValues(ByteBuffer buffer) throws QueryProcessException {
for (int i = 0; i < values.length; i++) {
if (values[i] == null) {
- ReadWriteIOUtils.write(TYPE_NULL, buffer);
continue;
}
// types are not determined, the situation mainly occurs when the plan uses string values
@@ -493,8 +490,7 @@ public class InsertRowPlan extends InsertPlan {
}
void serializeMeasurementsAndValues(ByteBuffer buffer) {
- buffer.putInt(
- measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
+ buffer.putInt(measurements.length - getFailedMeasurementNumber());
for (String measurement : measurements) {
if (measurement != null) {
@@ -502,7 +498,7 @@ public class InsertRowPlan extends InsertPlan {
}
}
try {
- buffer.putInt(dataTypes.length);
+ buffer.putInt(values.length - getFailedMeasurementNumber());
putValues(buffer);
} catch (QueryProcessException e) {
logger.error("Failed to serialize values for {}", this, e);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index d761506195..30ec3ebfb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -51,11 +51,9 @@ public class InsertTabletPlan extends InsertPlan {
private static final String DATATYPE_UNSUPPORTED = "Data type %s is not supported.";
private long[] times; // times should be sorted. It is done in the session API.
- private ByteBuffer timeBuffer;
private BitMap[] bitMaps;
private Object[] columns;
- private ByteBuffer valueBuffer;
private int rowCount = 0;
// indicate whether this plan has been set 'start' or 'end' in order to support plan transmission
// without data loss in cluster version
@@ -170,8 +168,7 @@ public class InsertTabletPlan extends InsertPlan {
}
private void writeMeasurements(DataOutputStream stream) throws IOException {
- stream.writeInt(
- measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
+ stream.writeInt(measurements.length - getFailedMeasurementNumber());
for (String m : measurements) {
if (m == null) {
continue;
@@ -181,13 +178,12 @@ public class InsertTabletPlan extends InsertPlan {
}
private void writeDataTypes(DataOutputStream stream) throws IOException {
- stream.writeInt(dataTypes.length);
+ stream.writeInt(dataTypes.length - getFailedMeasurementNumber());
for (int i = 0; i < dataTypes.length; i++) {
if (columns[i] == null) {
continue;
}
- TSDataType dataType = dataTypes[i];
- stream.write(dataType.serialize());
+ dataTypes[i].serializeTo(stream);
}
}
@@ -198,37 +194,36 @@ public class InsertTabletPlan extends InsertPlan {
stream.writeInt(rowCount);
}
- if (timeBuffer == null) {
- if (isExecuting) {
- for (int i = start; i < end; i++) {
- stream.writeLong(times[i]);
- }
- } else {
- for (long time : times) {
- stream.writeLong(time);
- }
+ if (isExecuting) {
+ for (int i = start; i < end; i++) {
+ stream.writeLong(times[i]);
}
} else {
- stream.write(timeBuffer.array());
- timeBuffer = null;
+ for (long time : times) {
+ stream.writeLong(time);
+ }
}
}
private void writeBitMaps(DataOutputStream stream) throws IOException {
stream.writeBoolean(bitMaps != null);
if (bitMaps != null) {
- for (BitMap bitMap : bitMaps) {
- if (bitMap == null) {
+ for (int i = 0; i < bitMaps.length; ++i) {
+ if (columns[i] == null) {
+ continue;
+ }
+
+ if (bitMaps[i] == null) {
stream.writeBoolean(false);
} else {
stream.writeBoolean(true);
if (isExecuting) {
int len = end - start;
BitMap partBitMap = new BitMap(len);
- BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
+ BitMap.copyOfRange(bitMaps[i], start, partBitMap, 0, len);
stream.write(partBitMap.getByteArray());
} else {
- stream.write(bitMap.getByteArray());
+ stream.write(bitMaps[i].getByteArray());
}
}
}
@@ -236,13 +231,7 @@ public class InsertTabletPlan extends InsertPlan {
}
private void writeValues(DataOutputStream stream) throws IOException {
- if (valueBuffer == null) {
- serializeValues(stream);
- } else {
- stream.write(valueBuffer.array());
- valueBuffer = null;
- }
-
+ serializeValues(stream);
stream.writeLong(index);
}
@@ -264,8 +253,7 @@ public class InsertTabletPlan extends InsertPlan {
}
private void writeMeasurements(ByteBuffer buffer) {
- buffer.putInt(
- measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
+ buffer.putInt(measurements.length - getFailedMeasurementNumber());
for (String m : measurements) {
if (m != null) {
putString(buffer, m);
@@ -274,13 +262,12 @@ public class InsertTabletPlan extends InsertPlan {
}
private void writeDataTypes(ByteBuffer buffer) {
- buffer.putInt(dataTypes.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
- for (int i = 0, dataTypesLength = dataTypes.length; i < dataTypesLength; i++) {
- TSDataType dataType = dataTypes[i];
+ buffer.putInt(dataTypes.length - getFailedMeasurementNumber());
+ for (int i = 0; i < dataTypes.length; i++) {
if (columns[i] == null) {
continue;
}
- dataType.serializeTo(buffer);
+ dataTypes[i].serializeTo(buffer);
}
}
@@ -291,37 +278,36 @@ public class InsertTabletPlan extends InsertPlan {
buffer.putInt(rowCount);
}
- if (timeBuffer == null) {
- if (isExecuting) {
- for (int i = start; i < end; i++) {
- buffer.putLong(times[i]);
- }
- } else {
- for (long time : times) {
- buffer.putLong(time);
- }
+ if (isExecuting) {
+ for (int i = start; i < end; i++) {
+ buffer.putLong(times[i]);
}
} else {
- buffer.put(timeBuffer.array());
- timeBuffer = null;
+ for (long time : times) {
+ buffer.putLong(time);
+ }
}
}
private void writeBitMaps(ByteBuffer buffer) {
buffer.put(BytesUtils.boolToByte(bitMaps != null));
if (bitMaps != null) {
- for (BitMap bitMap : bitMaps) {
- if (bitMap == null) {
+ for (int i = 0; i < bitMaps.length; i++) {
+ if (columns[i] == null) {
+ continue;
+ }
+
+ if (bitMaps[i] == null) {
buffer.put(BytesUtils.boolToByte(false));
} else {
buffer.put(BytesUtils.boolToByte(true));
if (isExecuting) {
int len = end - start;
BitMap partBitMap = new BitMap(len);
- BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
+ BitMap.copyOfRange(bitMaps[i], start, partBitMap, 0, len);
buffer.put(partBitMap.getByteArray());
} else {
- buffer.put(bitMap.getByteArray());
+ buffer.put(bitMaps[i].getByteArray());
}
}
}
@@ -329,18 +315,12 @@ public class InsertTabletPlan extends InsertPlan {
}
private void writeValues(ByteBuffer buffer) {
- if (valueBuffer == null) {
- serializeValues(buffer);
- } else {
- buffer.put(valueBuffer.array());
- valueBuffer = null;
- }
-
+ serializeValues(buffer);
buffer.putLong(index);
}
private void serializeValues(DataOutputStream outputStream) throws IOException {
- for (int i = 0; i < dataTypes.length; i++) {
+ for (int i = 0; i < columns.length; i++) {
if (columns[i] == null) {
continue;
}
@@ -349,7 +329,7 @@ public class InsertTabletPlan extends InsertPlan {
}
private void serializeValues(ByteBuffer buffer) {
- for (int i = 0; i < dataTypes.length; i++) {
+ for (int i = 0; i < columns.length; i++) {
if (columns[i] == null) {
continue;
}
@@ -452,16 +432,6 @@ public class InsertTabletPlan extends InsertPlan {
}
}
- public void setTimeBuffer(ByteBuffer timeBuffer) {
- this.timeBuffer = timeBuffer;
- this.timeBuffer.position(0);
- }
-
- public void setValueBuffer(ByteBuffer valueBuffer) {
- this.valueBuffer = valueBuffer;
- this.timeBuffer.position(0);
- }
-
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
this.devicePath = new PartialPath(readString(buffer));
@@ -649,8 +619,6 @@ public class InsertTabletPlan extends InsertPlan {
return rowCount == that.rowCount
&& Objects.equals(devicePath, that.devicePath)
&& Arrays.equals(times, that.times)
- && Objects.equals(timeBuffer, that.timeBuffer)
- && Objects.equals(valueBuffer, that.valueBuffer)
&& Objects.equals(paths, that.paths)
&& Objects.equals(range, that.range)
&& Objects.equals(isAligned, that.isAligned);
@@ -658,7 +626,7 @@ public class InsertTabletPlan extends InsertPlan {
@Override
public int hashCode() {
- int result = Objects.hash(timeBuffer, valueBuffer, rowCount, paths, range);
+ int result = Objects.hash(rowCount, paths, range);
result = 31 * result + Arrays.hashCode(times);
return result;
}