You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/10/26 13:43:06 UTC

[iotdb] branch master updated: [IOTDB-4757][IOTDB-4754]Fix concurrent register schema bug and show timeseries null element (#7728)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dda1ab202 [IOTDB-4757][IOTDB-4754]Fix concurrent register schema bug and show timeseries null element (#7728)
3dda1ab202 is described below

commit 3dda1ab202b89747ac0baf851eb47ec34cd3a6d4
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Wed Oct 26 21:42:59 2022 +0800

    [IOTDB-4757][IOTDB-4754]Fix concurrent register schema bug and show timeseries null element (#7728)
    
    [IOTDB-4757][IOTDB-4754]Fix concurrent register schema bug and show timeseries null element (#7728)
---
 client-py/tests/test_dataframe.py                  |   4 +-
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  | 247 +++++++++++++--------
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  | 128 +++++++----
 .../schema/TimeSeriesSchemaScanOperator.java       |   9 +-
 4 files changed, 243 insertions(+), 145 deletions(-)

diff --git a/client-py/tests/test_dataframe.py b/client-py/tests/test_dataframe.py
index f4f97a0b98..c7cce58ea5 100644
--- a/client-py/tests/test_dataframe.py
+++ b/client-py/tests/test_dataframe.py
@@ -78,8 +78,8 @@ def test_non_time_query():
                 "FLOAT",
                 "GORILLA",
                 "SNAPPY",
-                "null",
-                "null",
+                None,
+                None,
             ]
         ],
     )
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 650bdb97fe..9b9d38e58a 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -184,64 +184,75 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
       throw new IllegalPathException(path.getFullPath());
     }
     MetaFormatUtils.checkTimeseries(path);
-    Pair<IMNode, Template> pair = checkAndAutoCreateInternalPath(path.getDevicePath());
-    IMNode device = pair.left;
+    PartialPath devicePath = path.getDevicePath();
+    Pair<IMNode, Template> pair = checkAndAutoCreateInternalPath(devicePath);
+    IMNode deviceParent = pair.left;
     Template upperTemplate = pair.right;
 
     try {
-      MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props);
-
-      String leafName = path.getMeasurement();
-
       // synchronize check and add, we need addChild and add Alias become atomic operation
       // only write on mtree will be synchronized
       synchronized (this) {
-        if (alias != null && store.hasChild(device, alias)) {
-          throw new AliasAlreadyExistException(path.getFullPath(), alias);
-        }
+        pair = checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent, upperTemplate);
+        IMNode device = pair.left;
+        upperTemplate = pair.right;
 
-        if (store.hasChild(device, leafName)) {
-          throw new PathAlreadyExistException(path.getFullPath());
-        }
+        try {
+          MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props);
 
-        if (upperTemplate != null
-            && (upperTemplate.getDirectNode(leafName) != null
-                || upperTemplate.getDirectNode(alias) != null)) {
-          throw new TemplateImcompatibeException(path.getFullPath(), upperTemplate.getName());
-        }
+          String leafName = path.getMeasurement();
 
-        if (device.isEntity() && device.getAsEntityMNode().isAligned()) {
-          throw new AlignedTimeseriesException(
-              "Timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
-              device.getFullPath());
-        }
+          if (alias != null && store.hasChild(device, alias)) {
+            throw new AliasAlreadyExistException(path.getFullPath(), alias);
+          }
 
-        IEntityMNode entityMNode;
-        if (device.isEntity()) {
-          entityMNode = device.getAsEntityMNode();
-        } else {
-          entityMNode = store.setToEntity(device);
-          if (entityMNode.isStorageGroup()) {
-            this.storageGroupMNode = entityMNode.getAsStorageGroupMNode();
+          if (store.hasChild(device, leafName)) {
+            throw new PathAlreadyExistException(path.getFullPath());
+          }
+
+          if (upperTemplate != null
+              && (upperTemplate.getDirectNode(leafName) != null
+                  || upperTemplate.getDirectNode(alias) != null)) {
+            throw new TemplateImcompatibeException(path.getFullPath(), upperTemplate.getName());
+          }
+
+          if (device.isEntity() && device.getAsEntityMNode().isAligned()) {
+            throw new AlignedTimeseriesException(
+                "Timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
+                device.getFullPath());
+          }
+
+          IEntityMNode entityMNode;
+          if (device.isEntity()) {
+            entityMNode = device.getAsEntityMNode();
+          } else {
+            entityMNode = store.setToEntity(device);
+            if (entityMNode.isStorageGroup()) {
+              this.storageGroupMNode = entityMNode.getAsStorageGroupMNode();
+            }
+            device = entityMNode;
           }
-          device = entityMNode;
-        }
 
-        IMeasurementMNode measurementMNode =
-            MeasurementMNode.getMeasurementMNode(
-                entityMNode,
-                leafName,
-                new MeasurementSchema(leafName, dataType, encoding, compressor, props),
-                alias);
-        store.addChild(entityMNode, leafName, measurementMNode);
-        // link alias to LeafMNode
-        if (alias != null) {
-          entityMNode.addAlias(alias, measurementMNode);
+          IMeasurementMNode measurementMNode =
+              MeasurementMNode.getMeasurementMNode(
+                  entityMNode,
+                  leafName,
+                  new MeasurementSchema(leafName, dataType, encoding, compressor, props),
+                  alias);
+          store.addChild(entityMNode, leafName, measurementMNode);
+          // link alias to LeafMNode
+          if (alias != null) {
+            entityMNode.addAlias(alias, measurementMNode);
+          }
+          return measurementMNode;
+        } finally {
+          unPinMNode(device);
         }
-        return measurementMNode;
       }
     } finally {
-      unPinMNode(device);
+      if (deviceParent != null) {
+        unPinMNode(deviceParent);
+      }
     }
   }
 
@@ -267,71 +278,84 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
     List<IMeasurementMNode> measurementMNodeList = new ArrayList<>();
     MetaFormatUtils.checkSchemaMeasurementNames(measurements);
     Pair<IMNode, Template> pair = checkAndAutoCreateInternalPath(devicePath);
-    IMNode device = pair.left;
+    IMNode deviceParent = pair.left;
     Template upperTemplate = pair.right;
 
     try {
       // synchronize check and add, we need addChild operation be atomic.
       // only write operations on mtree will be synchronized
       synchronized (this) {
-        for (int i = 0; i < measurements.size(); i++) {
-          if (store.hasChild(device, measurements.get(i))) {
-            throw new PathAlreadyExistException(
-                devicePath.getFullPath() + "." + measurements.get(i));
-          }
-          if (aliasList != null
-              && aliasList.get(i) != null
-              && store.hasChild(device, aliasList.get(i))) {
-            throw new AliasAlreadyExistException(
-                devicePath.getFullPath() + "." + measurements.get(i), aliasList.get(i));
+        pair = checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent, upperTemplate);
+        IMNode device = pair.left;
+        upperTemplate = pair.right;
+
+        try {
+          for (int i = 0; i < measurements.size(); i++) {
+            if (store.hasChild(device, measurements.get(i))) {
+              throw new PathAlreadyExistException(
+                  devicePath.getFullPath() + "." + measurements.get(i));
+            }
+            if (aliasList != null
+                && aliasList.get(i) != null
+                && store.hasChild(device, aliasList.get(i))) {
+              throw new AliasAlreadyExistException(
+                  devicePath.getFullPath() + "." + measurements.get(i), aliasList.get(i));
+            }
           }
-        }
-      }
 
-      if (upperTemplate != null) {
-        for (String measurement : measurements) {
-          if (upperTemplate.getDirectNode(measurement) != null) {
-            throw new TemplateImcompatibeException(
-                devicePath.concatNode(measurement).getFullPath(), upperTemplate.getName());
+          if (upperTemplate != null) {
+            for (String measurement : measurements) {
+              if (upperTemplate.getDirectNode(measurement) != null) {
+                throw new TemplateImcompatibeException(
+                    devicePath.concatNode(measurement).getFullPath(), upperTemplate.getName());
+              }
+            }
           }
-        }
-      }
 
-      if (device.isEntity() && !device.getAsEntityMNode().isAligned()) {
-        throw new AlignedTimeseriesException(
-            "Timeseries under this entity is not aligned, please use createTimeseries or change entity.",
-            devicePath.getFullPath());
-      }
+          if (device.isEntity() && !device.getAsEntityMNode().isAligned()) {
+            throw new AlignedTimeseriesException(
+                "Timeseries under this entity is not aligned, please use createTimeseries or change entity.",
+                devicePath.getFullPath());
+          }
 
-      IEntityMNode entityMNode;
-      if (device.isEntity()) {
-        entityMNode = device.getAsEntityMNode();
-      } else {
-        entityMNode = store.setToEntity(device);
-        entityMNode.setAligned(true);
-        if (entityMNode.isStorageGroup()) {
-          this.storageGroupMNode = entityMNode.getAsStorageGroupMNode();
-        }
-        device = entityMNode;
-      }
+          IEntityMNode entityMNode;
+          if (device.isEntity()) {
+            entityMNode = device.getAsEntityMNode();
+          } else {
+            entityMNode = store.setToEntity(device);
+            entityMNode.setAligned(true);
+            if (entityMNode.isStorageGroup()) {
+              this.storageGroupMNode = entityMNode.getAsStorageGroupMNode();
+            }
+            device = entityMNode;
+          }
 
-      for (int i = 0; i < measurements.size(); i++) {
-        IMeasurementMNode measurementMNode =
-            MeasurementMNode.getMeasurementMNode(
-                entityMNode,
-                measurements.get(i),
-                new MeasurementSchema(
-                    measurements.get(i), dataTypes.get(i), encodings.get(i), compressors.get(i)),
-                aliasList == null ? null : aliasList.get(i));
-        store.addChild(entityMNode, measurements.get(i), measurementMNode);
-        if (aliasList != null && aliasList.get(i) != null) {
-          entityMNode.addAlias(aliasList.get(i), measurementMNode);
+          for (int i = 0; i < measurements.size(); i++) {
+            IMeasurementMNode measurementMNode =
+                MeasurementMNode.getMeasurementMNode(
+                    entityMNode,
+                    measurements.get(i),
+                    new MeasurementSchema(
+                        measurements.get(i),
+                        dataTypes.get(i),
+                        encodings.get(i),
+                        compressors.get(i)),
+                    aliasList == null ? null : aliasList.get(i));
+            store.addChild(entityMNode, measurements.get(i), measurementMNode);
+            if (aliasList != null && aliasList.get(i) != null) {
+              entityMNode.addAlias(aliasList.get(i), measurementMNode);
+            }
+            measurementMNodeList.add(measurementMNode);
+          }
+          return measurementMNodeList;
+        } finally {
+          unPinMNode(device);
         }
-        measurementMNodeList.add(measurementMNode);
       }
-      return measurementMNodeList;
     } finally {
-      unPinMNode(device);
+      if (deviceParent != null) {
+        unPinMNode(deviceParent);
+      }
     }
   }
 
@@ -400,13 +424,16 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
       throws MetadataException {
     String[] nodeNames = devicePath.getNodes();
     MetaFormatUtils.checkTimeseries(devicePath);
+    if (nodeNames.length == levelOfSG + 1) {
+      return new Pair<>(null, null);
+    }
     IMNode cur = storageGroupMNode;
     IMNode child;
     String childName;
     Template upperTemplate = cur.getSchemaTemplate();
     try {
-      // e.g, path = root.sg.d1.s1,  create internal nodes and set cur to d1 node
-      for (int i = levelOfSG + 1; i < nodeNames.length; i++) {
+      // e.g, path = root.sg.d1.s1,  create internal nodes and set cur to sg node, parent of d1
+      for (int i = levelOfSG + 1; i < nodeNames.length - 1; i++) {
         childName = nodeNames[i];
         child = store.getChild(cur, childName);
         if (child == null) {
@@ -433,6 +460,36 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
     }
   }
 
+  private Pair<IMNode, Template> checkAndAutoCreateDeviceNode(
+      String deviceName, IMNode deviceParent, Template upperTemplate) throws MetadataException {
+    if (deviceParent == null) {
+      // device is sg
+      pinMNode(storageGroupMNode);
+      return new Pair<>(storageGroupMNode, null);
+    }
+    IMNode device = store.getChild(deviceParent, deviceName);
+    if (device == null) {
+      if (upperTemplate != null && upperTemplate.getDirectNode(deviceName) != null) {
+        throw new TemplateImcompatibeException(
+            deviceParent.getPartialPath().concatNode(deviceName).getFullPath(),
+            upperTemplate.getName(),
+            deviceName);
+      }
+      device =
+          store.addChild(deviceParent, deviceName, new InternalMNode(deviceParent, deviceName));
+    }
+
+    if (device.isMeasurement()) {
+      throw new PathAlreadyExistException(device.getFullPath());
+    }
+
+    if (device.getSchemaTemplate() != null) {
+      upperTemplate = device.getSchemaTemplate();
+    }
+
+    return new Pair<>(device, upperTemplate);
+  }
+
   /**
    * Delete path. The path should be a full path from root to leaf node
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index 0f5f95c155..f1befa38d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -203,17 +203,22 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
       throw new IllegalPathException(path.getFullPath());
     }
     MetaFormatUtils.checkTimeseries(path);
-    Pair<IMNode, Template> pair = checkAndAutoCreateInternalPath(path.getDevicePath());
-    IMNode device = pair.left;
+    PartialPath devicePath = path.getDevicePath();
+    Pair<IMNode, Template> pair = checkAndAutoCreateInternalPath(devicePath);
+    IMNode deviceParent = pair.left;
     Template upperTemplate = pair.right;
 
-    MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props);
-
-    String leafName = path.getMeasurement();
-
     // synchronize check and add, we need addChild and add Alias become atomic operation
     // only write on mtree will be synchronized
     synchronized (this) {
+      pair = checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent, upperTemplate);
+      IMNode device = pair.left;
+      upperTemplate = pair.right;
+
+      MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props);
+
+      String leafName = path.getMeasurement();
+
       if (alias != null && device.hasChild(alias)) {
         throw new AliasAlreadyExistException(path.getFullPath(), alias);
       }
@@ -291,12 +296,16 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
     List<IMeasurementMNode> measurementMNodeList = new ArrayList<>();
     MetaFormatUtils.checkSchemaMeasurementNames(measurements);
     Pair<IMNode, Template> pair = checkAndAutoCreateInternalPath(devicePath);
-    IMNode device = pair.left;
+    IMNode deviceParent = pair.left;
     Template upperTemplate = pair.right;
 
     // synchronize check and add, we need addChild operation be atomic.
     // only write operations on mtree will be synchronized
     synchronized (this) {
+      pair = checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent, upperTemplate);
+      IMNode device = pair.left;
+      upperTemplate = pair.right;
+
       for (int i = 0; i < measurements.size(); i++) {
         if (device.hasChild(measurements.get(i))) {
           IMNode node = device.getChild(measurements.get(i));
@@ -318,61 +327,64 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
               devicePath.getFullPath() + "." + measurements.get(i), aliasList.get(i));
         }
       }
-    }
 
-    if (upperTemplate != null) {
-      for (String measurement : measurements) {
-        if (upperTemplate.getDirectNode(measurement) != null) {
-          throw new TemplateImcompatibeException(
-              devicePath.concatNode(measurement).getFullPath(), upperTemplate.getName());
+      if (upperTemplate != null) {
+        for (String measurement : measurements) {
+          if (upperTemplate.getDirectNode(measurement) != null) {
+            throw new TemplateImcompatibeException(
+                devicePath.concatNode(measurement).getFullPath(), upperTemplate.getName());
+          }
         }
       }
-    }
 
-    if (device.isEntity() && !device.getAsEntityMNode().isAligned()) {
-      throw new AlignedTimeseriesException(
-          "Timeseries under this entity is not aligned, please use createTimeseries or change entity.",
-          devicePath.getFullPath());
-    }
+      if (device.isEntity() && !device.getAsEntityMNode().isAligned()) {
+        throw new AlignedTimeseriesException(
+            "Timeseries under this entity is not aligned, please use createTimeseries or change entity.",
+            devicePath.getFullPath());
+      }
 
-    IEntityMNode entityMNode;
-    if (device.isEntity()) {
-      entityMNode = device.getAsEntityMNode();
-    } else {
-      entityMNode = store.setToEntity(device);
-      entityMNode.setAligned(true);
-      if (entityMNode.isStorageGroup()) {
-        this.storageGroupMNode = entityMNode.getAsStorageGroupMNode();
+      IEntityMNode entityMNode;
+      if (device.isEntity()) {
+        entityMNode = device.getAsEntityMNode();
+      } else {
+        entityMNode = store.setToEntity(device);
+        entityMNode.setAligned(true);
+        if (entityMNode.isStorageGroup()) {
+          this.storageGroupMNode = entityMNode.getAsStorageGroupMNode();
+        }
       }
-    }
 
-    for (int i = 0; i < measurements.size(); i++) {
-      IMeasurementMNode measurementMNode =
-          MeasurementMNode.getMeasurementMNode(
-              entityMNode,
-              measurements.get(i),
-              new MeasurementSchema(
-                  measurements.get(i), dataTypes.get(i), encodings.get(i), compressors.get(i)),
-              aliasList == null ? null : aliasList.get(i));
-      store.addChild(entityMNode, measurements.get(i), measurementMNode);
-      if (aliasList != null && aliasList.get(i) != null) {
-        entityMNode.addAlias(aliasList.get(i), measurementMNode);
+      for (int i = 0; i < measurements.size(); i++) {
+        IMeasurementMNode measurementMNode =
+            MeasurementMNode.getMeasurementMNode(
+                entityMNode,
+                measurements.get(i),
+                new MeasurementSchema(
+                    measurements.get(i), dataTypes.get(i), encodings.get(i), compressors.get(i)),
+                aliasList == null ? null : aliasList.get(i));
+        store.addChild(entityMNode, measurements.get(i), measurementMNode);
+        if (aliasList != null && aliasList.get(i) != null) {
+          entityMNode.addAlias(aliasList.get(i), measurementMNode);
+        }
+        measurementMNodeList.add(measurementMNode);
       }
-      measurementMNodeList.add(measurementMNode);
+      return measurementMNodeList;
     }
-    return measurementMNodeList;
   }
 
   private Pair<IMNode, Template> checkAndAutoCreateInternalPath(PartialPath devicePath)
       throws MetadataException {
     String[] nodeNames = devicePath.getNodes();
     MetaFormatUtils.checkTimeseries(devicePath);
+    if (nodeNames.length == levelOfSG + 1) {
+      return new Pair<>(null, null);
+    }
     IMNode cur = storageGroupMNode;
     IMNode child;
     String childName;
     Template upperTemplate = cur.getSchemaTemplate();
-    // e.g, path = root.sg.d1.s1,  create internal nodes and set cur to d1 node
-    for (int i = levelOfSG + 1; i < nodeNames.length; i++) {
+    // e.g, path = root.sg.d1.s1,  create internal nodes and set cur to sg node, parent of d1
+    for (int i = levelOfSG + 1; i < nodeNames.length - 1; i++) {
       childName = nodeNames[i];
       child = cur.getChild(childName);
       if (child == null) {
@@ -395,6 +407,36 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
     return new Pair<>(cur, upperTemplate);
   }
 
+  private Pair<IMNode, Template> checkAndAutoCreateDeviceNode(
+      String deviceName, IMNode deviceParent, Template upperTemplate)
+      throws PathAlreadyExistException, TemplateImcompatibeException {
+    if (deviceParent == null) {
+      // device is sg
+      return new Pair<>(storageGroupMNode, null);
+    }
+    IMNode device = store.getChild(deviceParent, deviceName);
+    if (device == null) {
+      if (upperTemplate != null && upperTemplate.getDirectNode(deviceName) != null) {
+        throw new TemplateImcompatibeException(
+            deviceParent.getPartialPath().concatNode(deviceName).getFullPath(),
+            upperTemplate.getName(),
+            deviceName);
+      }
+      device =
+          store.addChild(deviceParent, deviceName, new InternalMNode(deviceParent, deviceName));
+    }
+
+    if (device.isMeasurement()) {
+      throw new PathAlreadyExistException(device.getFullPath());
+    }
+
+    if (device.getSchemaTemplate() != null) {
+      upperTemplate = device.getSchemaTemplate();
+    }
+
+    return new Pair<>(device, upperTemplate);
+  }
+
   @Override
   public Map<Integer, MetadataException> checkMeasurementExistence(
       PartialPath devicePath, List<String> measurementList, List<String> aliasList) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesSchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesSchemaScanOperator.java
index 7fbc42f88f..e81eb40bd1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesSchemaScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesSchemaScanOperator.java
@@ -125,14 +125,13 @@ public class TimeSeriesSchemaScanOperator extends SchemaQueryScanOperator {
   }
 
   private String mapToString(Map<String, String> map) {
+    if (map == null || map.isEmpty()) {
+      return null;
+    }
     String content =
         map.entrySet().stream()
             .map(e -> "\"" + e.getKey() + "\"" + ":" + "\"" + e.getValue() + "\"")
             .collect(Collectors.joining(","));
-    if (content.isEmpty()) {
-      return "null";
-    } else {
-      return "{" + content + "}";
-    }
+    return "{" + content + "}";
   }
 }