You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/26 14:49:01 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-2983] Serialization error in Partial insert (#5673)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new ee6ae34f8b [To rel/0.13][IOTDB-2983] Serialization error in Partial insert (#5673)
ee6ae34f8b is described below

commit ee6ae34f8bedf577be69a334003a25c09fb19240
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Tue Apr 26 22:48:54 2022 +0800

    [To rel/0.13][IOTDB-2983] Serialization error in Partial insert (#5673)
---
 .../db/integration/IoTDBPartialInsertionIT.java    |  62 ++++++++++++
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |  12 +--
 .../db/qp/physical/crud/InsertTabletPlan.java      | 112 ++++++++-------------
 3 files changed, 106 insertions(+), 80 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBPartialInsertionIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBPartialInsertionIT.java
index 11fae494e9..b8d8789edf 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBPartialInsertionIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBPartialInsertionIT.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.integration;
 
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.integration.env.ConfigFactory;
 import org.apache.iotdb.integration.env.EnvFactory;
 import org.apache.iotdb.itbase.category.ClusterTest;
@@ -25,19 +27,26 @@ import org.apache.iotdb.itbase.category.LocalStandaloneTest;
 import org.apache.iotdb.jdbc.IoTDBSQLException;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @Category({LocalStandaloneTest.class, ClusterTest.class})
 public class IoTDBPartialInsertionIT {
+  private final Logger logger = LoggerFactory.getLogger(IoTDBPartialInsertionIT.class);
 
   @Before
   public void setUp() throws Exception {
@@ -66,4 +75,57 @@ public class IoTDBPartialInsertionIT {
       }
     }
   }
+
+  @Test
+  public void testPartialInsertionRestart() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      statement.execute("SET STORAGE GROUP TO root.sg");
+      statement.execute("CREATE TIMESERIES root.sg.d1.s1 datatype=text");
+      statement.execute("CREATE TIMESERIES root.sg.d1.s2 datatype=double");
+
+      try {
+        statement.execute("INSERT INTO root.sg.d1(time,s1,s2) VALUES(100,'test','test')");
+      } catch (IoTDBSQLException e) {
+        // ignore
+      }
+    }
+
+    long time = 0;
+    try {
+      EnvironmentUtils.restartDaemon();
+      StorageEngine.getInstance().recover();
+      // wait for recover
+      while (!StorageEngine.getInstance().isAllSgReady()) {
+        Thread.sleep(500);
+        time += 500;
+        if (time > 10000) {
+          logger.warn("wait too long in restart, wait for: " + time / 1000 + "s");
+        }
+      }
+    } catch (Exception e) {
+      Assert.fail();
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("SELECT s1 FROM root.sg.d1");
+      assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+          assertEquals("test", resultSet.getString("root.sg.d1.s1"));
+        }
+        assertEquals(1, cnt);
+      }
+      hasResultSet = statement.execute("SELECT s2 FROM root.sg.d1");
+      assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        assertFalse(resultSet.next());
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index ee263cca21..85693d2879 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -342,8 +342,7 @@ public class InsertRowPlan extends InsertPlan {
   }
 
   void serializeMeasurementsAndValues(DataOutputStream stream) throws IOException {
-    stream.writeInt(
-        measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
+    stream.writeInt(measurements.length - getFailedMeasurementNumber());
 
     for (String m : measurements) {
       if (m != null) {
@@ -352,7 +351,7 @@ public class InsertRowPlan extends InsertPlan {
     }
 
     try {
-      stream.writeInt(dataTypes.length);
+      stream.writeInt(values.length - getFailedMeasurementNumber());
       putValues(stream);
     } catch (QueryProcessException e) {
       throw new IOException(e);
@@ -368,7 +367,6 @@ public class InsertRowPlan extends InsertPlan {
   private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException {
     for (int i = 0; i < values.length; i++) {
       if (values[i] == null) {
-        ReadWriteIOUtils.write(TYPE_NULL, outputStream);
         continue;
       }
       // types are not determined, the situation mainly occurs when the plan uses string values
@@ -407,7 +405,6 @@ public class InsertRowPlan extends InsertPlan {
   private void putValues(ByteBuffer buffer) throws QueryProcessException {
     for (int i = 0; i < values.length; i++) {
       if (values[i] == null) {
-        ReadWriteIOUtils.write(TYPE_NULL, buffer);
         continue;
       }
       // types are not determined, the situation mainly occurs when the plan uses string values
@@ -493,8 +490,7 @@ public class InsertRowPlan extends InsertPlan {
   }
 
   void serializeMeasurementsAndValues(ByteBuffer buffer) {
-    buffer.putInt(
-        measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
+    buffer.putInt(measurements.length - getFailedMeasurementNumber());
 
     for (String measurement : measurements) {
       if (measurement != null) {
@@ -502,7 +498,7 @@ public class InsertRowPlan extends InsertPlan {
       }
     }
     try {
-      buffer.putInt(dataTypes.length);
+      buffer.putInt(values.length - getFailedMeasurementNumber());
       putValues(buffer);
     } catch (QueryProcessException e) {
       logger.error("Failed to serialize values for {}", this, e);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index d761506195..30ec3ebfb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -51,11 +51,9 @@ public class InsertTabletPlan extends InsertPlan {
   private static final String DATATYPE_UNSUPPORTED = "Data type %s is not supported.";
 
   private long[] times; // times should be sorted. It is done in the session API.
-  private ByteBuffer timeBuffer;
 
   private BitMap[] bitMaps;
   private Object[] columns;
-  private ByteBuffer valueBuffer;
   private int rowCount = 0;
   // indicate whether this plan has been set 'start' or 'end' in order to support plan transmission
   // without data loss in cluster version
@@ -170,8 +168,7 @@ public class InsertTabletPlan extends InsertPlan {
   }
 
   private void writeMeasurements(DataOutputStream stream) throws IOException {
-    stream.writeInt(
-        measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
+    stream.writeInt(measurements.length - getFailedMeasurementNumber());
     for (String m : measurements) {
       if (m == null) {
         continue;
@@ -181,13 +178,12 @@ public class InsertTabletPlan extends InsertPlan {
   }
 
   private void writeDataTypes(DataOutputStream stream) throws IOException {
-    stream.writeInt(dataTypes.length);
+    stream.writeInt(dataTypes.length - getFailedMeasurementNumber());
     for (int i = 0; i < dataTypes.length; i++) {
       if (columns[i] == null) {
         continue;
       }
-      TSDataType dataType = dataTypes[i];
-      stream.write(dataType.serialize());
+      dataTypes[i].serializeTo(stream);
     }
   }
 
@@ -198,37 +194,36 @@ public class InsertTabletPlan extends InsertPlan {
       stream.writeInt(rowCount);
     }
 
-    if (timeBuffer == null) {
-      if (isExecuting) {
-        for (int i = start; i < end; i++) {
-          stream.writeLong(times[i]);
-        }
-      } else {
-        for (long time : times) {
-          stream.writeLong(time);
-        }
+    if (isExecuting) {
+      for (int i = start; i < end; i++) {
+        stream.writeLong(times[i]);
       }
     } else {
-      stream.write(timeBuffer.array());
-      timeBuffer = null;
+      for (long time : times) {
+        stream.writeLong(time);
+      }
     }
   }
 
   private void writeBitMaps(DataOutputStream stream) throws IOException {
     stream.writeBoolean(bitMaps != null);
     if (bitMaps != null) {
-      for (BitMap bitMap : bitMaps) {
-        if (bitMap == null) {
+      for (int i = 0; i < bitMaps.length; ++i) {
+        if (columns[i] == null) {
+          continue;
+        }
+
+        if (bitMaps[i] == null) {
           stream.writeBoolean(false);
         } else {
           stream.writeBoolean(true);
           if (isExecuting) {
             int len = end - start;
             BitMap partBitMap = new BitMap(len);
-            BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
+            BitMap.copyOfRange(bitMaps[i], start, partBitMap, 0, len);
             stream.write(partBitMap.getByteArray());
           } else {
-            stream.write(bitMap.getByteArray());
+            stream.write(bitMaps[i].getByteArray());
           }
         }
       }
@@ -236,13 +231,7 @@ public class InsertTabletPlan extends InsertPlan {
   }
 
   private void writeValues(DataOutputStream stream) throws IOException {
-    if (valueBuffer == null) {
-      serializeValues(stream);
-    } else {
-      stream.write(valueBuffer.array());
-      valueBuffer = null;
-    }
-
+    serializeValues(stream);
     stream.writeLong(index);
   }
 
@@ -264,8 +253,7 @@ public class InsertTabletPlan extends InsertPlan {
   }
 
   private void writeMeasurements(ByteBuffer buffer) {
-    buffer.putInt(
-        measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
+    buffer.putInt(measurements.length - getFailedMeasurementNumber());
     for (String m : measurements) {
       if (m != null) {
         putString(buffer, m);
@@ -274,13 +262,12 @@ public class InsertTabletPlan extends InsertPlan {
   }
 
   private void writeDataTypes(ByteBuffer buffer) {
-    buffer.putInt(dataTypes.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
-    for (int i = 0, dataTypesLength = dataTypes.length; i < dataTypesLength; i++) {
-      TSDataType dataType = dataTypes[i];
+    buffer.putInt(dataTypes.length - getFailedMeasurementNumber());
+    for (int i = 0; i < dataTypes.length; i++) {
       if (columns[i] == null) {
         continue;
       }
-      dataType.serializeTo(buffer);
+      dataTypes[i].serializeTo(buffer);
     }
   }
 
@@ -291,37 +278,36 @@ public class InsertTabletPlan extends InsertPlan {
       buffer.putInt(rowCount);
     }
 
-    if (timeBuffer == null) {
-      if (isExecuting) {
-        for (int i = start; i < end; i++) {
-          buffer.putLong(times[i]);
-        }
-      } else {
-        for (long time : times) {
-          buffer.putLong(time);
-        }
+    if (isExecuting) {
+      for (int i = start; i < end; i++) {
+        buffer.putLong(times[i]);
       }
     } else {
-      buffer.put(timeBuffer.array());
-      timeBuffer = null;
+      for (long time : times) {
+        buffer.putLong(time);
+      }
     }
   }
 
   private void writeBitMaps(ByteBuffer buffer) {
     buffer.put(BytesUtils.boolToByte(bitMaps != null));
     if (bitMaps != null) {
-      for (BitMap bitMap : bitMaps) {
-        if (bitMap == null) {
+      for (int i = 0; i < bitMaps.length; i++) {
+        if (columns[i] == null) {
+          continue;
+        }
+
+        if (bitMaps[i] == null) {
           buffer.put(BytesUtils.boolToByte(false));
         } else {
           buffer.put(BytesUtils.boolToByte(true));
           if (isExecuting) {
             int len = end - start;
             BitMap partBitMap = new BitMap(len);
-            BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
+            BitMap.copyOfRange(bitMaps[i], start, partBitMap, 0, len);
             buffer.put(partBitMap.getByteArray());
           } else {
-            buffer.put(bitMap.getByteArray());
+            buffer.put(bitMaps[i].getByteArray());
           }
         }
       }
@@ -329,18 +315,12 @@ public class InsertTabletPlan extends InsertPlan {
   }
 
   private void writeValues(ByteBuffer buffer) {
-    if (valueBuffer == null) {
-      serializeValues(buffer);
-    } else {
-      buffer.put(valueBuffer.array());
-      valueBuffer = null;
-    }
-
+    serializeValues(buffer);
     buffer.putLong(index);
   }
 
   private void serializeValues(DataOutputStream outputStream) throws IOException {
-    for (int i = 0; i < dataTypes.length; i++) {
+    for (int i = 0; i < columns.length; i++) {
       if (columns[i] == null) {
         continue;
       }
@@ -349,7 +329,7 @@ public class InsertTabletPlan extends InsertPlan {
   }
 
   private void serializeValues(ByteBuffer buffer) {
-    for (int i = 0; i < dataTypes.length; i++) {
+    for (int i = 0; i < columns.length; i++) {
       if (columns[i] == null) {
         continue;
       }
@@ -452,16 +432,6 @@ public class InsertTabletPlan extends InsertPlan {
     }
   }
 
-  public void setTimeBuffer(ByteBuffer timeBuffer) {
-    this.timeBuffer = timeBuffer;
-    this.timeBuffer.position(0);
-  }
-
-  public void setValueBuffer(ByteBuffer valueBuffer) {
-    this.valueBuffer = valueBuffer;
-    this.timeBuffer.position(0);
-  }
-
   @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     this.devicePath = new PartialPath(readString(buffer));
@@ -649,8 +619,6 @@ public class InsertTabletPlan extends InsertPlan {
     return rowCount == that.rowCount
         && Objects.equals(devicePath, that.devicePath)
         && Arrays.equals(times, that.times)
-        && Objects.equals(timeBuffer, that.timeBuffer)
-        && Objects.equals(valueBuffer, that.valueBuffer)
         && Objects.equals(paths, that.paths)
         && Objects.equals(range, that.range)
         && Objects.equals(isAligned, that.isAligned);
@@ -658,7 +626,7 @@ public class InsertTabletPlan extends InsertPlan {
 
   @Override
   public int hashCode() {
-    int result = Objects.hash(timeBuffer, valueBuffer, rowCount, paths, range);
+    int result = Objects.hash(rowCount, paths, range);
     result = 31 * result + Arrays.hashCode(times);
     return result;
   }