You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/11/16 08:35:20 UTC

[iotdb] 05/06: Fix tests

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch new_vector_writablechunkgroup
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 96bac68711d43a2e6887dc70bd4363f85dd01c25
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Nov 16 16:28:18 2021 +0800

    Fix tests
---
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 25 +++++++++++++------
 .../db/engine/memtable/WritableMemChunkGroup.java  | 28 ++++++++++++++++++----
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |  6 +----
 .../iotdb/db/writelog/recover/LogReplayer.java     | 15 +++++++++---
 4 files changed, 54 insertions(+), 20 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 35f5910..8a4e9d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -92,7 +92,13 @@ public abstract class AbstractMemTable implements IMemTable {
   private IWritableMemChunkGroup createMemChunkGroupIfNotExistAndGet(
       String deviceId, List<IMeasurementSchema> schemaList) {
     IWritableMemChunkGroup memChunkGroup =
-        memTableMap.computeIfAbsent(deviceId, k -> new WritableMemChunkGroup(schemaList));
+        memTableMap.computeIfAbsent(
+            deviceId,
+            k -> {
+              seriesNumber++;
+              totalPointsNumThreshold += avgSeriesPointNumThreshold;
+              return new WritableMemChunkGroup(schemaList);
+            });
     for (IMeasurementSchema schema : schemaList) {
       if (!memChunkGroup.contains(schema.getMeasurementId())) {
         seriesNumber++;
@@ -105,7 +111,13 @@ public abstract class AbstractMemTable implements IMemTable {
   private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet(
       String deviceId, List<IMeasurementSchema> schemaList) {
     IWritableMemChunkGroup memChunkGroup =
-        memTableMap.computeIfAbsent(deviceId, k -> new AlignedWritableMemChunkGroup(schemaList));
+        memTableMap.computeIfAbsent(
+            deviceId,
+            k -> {
+              seriesNumber++;
+              totalPointsNumThreshold += avgSeriesPointNumThreshold;
+              return new AlignedWritableMemChunkGroup(schemaList);
+            });
     for (IMeasurementSchema schema : schemaList) {
       if (!memChunkGroup.contains(schema.getMeasurementId())) {
         seriesNumber++;
@@ -123,7 +135,7 @@ public abstract class AbstractMemTable implements IMemTable {
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     List<TSDataType> dataTypes = new ArrayList<>();
     for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
-      if (insertRowPlan.getMeasurements()[i] == null) {
+      if (values[i] == null) {
         continue;
       }
       IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema();
@@ -143,7 +155,7 @@ public abstract class AbstractMemTable implements IMemTable {
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     List<TSDataType> dataTypes = new ArrayList<>();
     for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
-      if (insertRowPlan.getMeasurements()[i] == null) {
+      if (insertRowPlan.getValues()[i] == null) {
         continue;
       }
       IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema();
@@ -210,7 +222,6 @@ public abstract class AbstractMemTable implements IMemTable {
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
   @Override
   public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
-    updatePlanIndexes(insertTabletPlan.getIndex());
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
       if (insertTabletPlan.getColumns()[i] == null) {
@@ -220,7 +231,7 @@ public abstract class AbstractMemTable implements IMemTable {
       schemaList.add(schema);
     }
     IWritableMemChunkGroup memChunkGroup =
-        createAlignedMemChunkGroupIfNotExistAndGet(
+        createMemChunkGroupIfNotExistAndGet(
             insertTabletPlan.getDeviceId().getFullPath(), schemaList);
     memChunkGroup.writeValues(
         insertTabletPlan.getTimes(),
@@ -231,8 +242,8 @@ public abstract class AbstractMemTable implements IMemTable {
         end);
   }
 
+  @Override
   public void writeAlignedTablet(InsertTabletPlan insertTabletPlan, int start, int end) {
-    updatePlanIndexes(insertTabletPlan.getIndex());
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
       if (insertTabletPlan.getColumns()[i] == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
index cc1c93d..5845da3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
@@ -26,9 +26,21 @@ public class WritableMemChunkGroup implements IWritableMemChunkGroup {
       List<IMeasurementSchema> schemaList,
       int start,
       int end) {
-    for (int i = 0; i < schemaList.size(); i++) {
-      IWritableMemChunk memChunk = createMemChunkIfNotExistAndGet(schemaList.get(i));
-      memChunk.write(times, columns[i], bitMaps[i], schemaList.get(i).getType(), start, end);
+    int emptyColumnCount = 0;
+    for (int i = 0; i < columns.length; i++) {
+      if (columns[i] == null) {
+        emptyColumnCount++;
+        continue;
+      }
+      IWritableMemChunk memChunk =
+          createMemChunkIfNotExistAndGet(schemaList.get(i - emptyColumnCount));
+      memChunk.write(
+          times,
+          columns[i],
+          bitMaps == null ? null : bitMaps[i],
+          schemaList.get(i - emptyColumnCount).getType(),
+          start,
+          end);
     }
   }
 
@@ -63,8 +75,14 @@ public class WritableMemChunkGroup implements IWritableMemChunkGroup {
 
   @Override
   public void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
-    for (int i = 0; i < schemaList.size(); i++) {
-      IWritableMemChunk memChunk = createMemChunkIfNotExistAndGet(schemaList.get(i));
+    int emptyColumnCount = 0;
+    for (int i = 0; i < objectValue.length; i++) {
+      if (objectValue[i] == null) {
+        emptyColumnCount++;
+        continue;
+      }
+      IWritableMemChunk memChunk =
+          createMemChunkIfNotExistAndGet(schemaList.get(i - emptyColumnCount));
       memChunk.write(insertTime, objectValue[i]);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 5fb024b..c29ebc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -241,11 +241,7 @@ public class InsertRowPlan extends InsertPlan {
           }
           continue;
         }
-        if (isAligned) {
-          dataTypes[i] = measurementMNodes[i].getSchema().getSubMeasurementsTSDataTypeList().get(i);
-        } else {
-          dataTypes[i] = measurementMNodes[i].getSchema().getType();
-        }
+        dataTypes[i] = measurementMNodes[i].getSchema().getType();
         try {
           values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
         } catch (Exception e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index c88ec3d..836fdfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -195,10 +195,19 @@ public class LogReplayer {
     // mark failed plan manually
     checkDataTypeAndMarkFailed(mNodes, plan);
     if (plan instanceof InsertRowPlan) {
-      recoverMemTable.insert((InsertRowPlan) plan);
+      if (plan.isAligned()) {
+        recoverMemTable.insertAlignedRow((InsertRowPlan) plan);
+      } else {
+        recoverMemTable.insert((InsertRowPlan) plan);
+      }
     } else {
-      recoverMemTable.insertTablet(
-          (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+      if (plan.isAligned()) {
+        recoverMemTable.insertAlignedTablet(
+            (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+      } else {
+        recoverMemTable.insertTablet(
+            (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+      }
     }
   }