You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/05 04:24:01 UTC

[iotdb] branch master updated: [IOTDB-3057]Auto create schema (#5770)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c98719fb4a [IOTDB-3057]Auto create schema (#5770)
c98719fb4a is described below

commit c98719fb4a3a21709e0847fd1789296935f05077
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Thu May 5 12:23:55 2022 +0800

    [IOTDB-3057]Auto create schema (#5770)
---
 .../db/metadata/cache/DataNodeSchemaCache.java     | 151 ++++--------
 .../iotdb/db/metadata/cache/SchemaCacheEntity.java | 114 ---------
 .../iotdb/db/metadata/cache/SchemaCacheEntry.java  |  58 +++++
 .../db/mpp/common/schematree/DeviceSchemaInfo.java |   5 +-
 .../db/mpp/common/schematree/PathPatternTree.java  |   9 +-
 .../iotdb/db/mpp/common/schematree/SchemaTree.java |  44 ++--
 .../common/schematree/node/SchemaInternalNode.java |   7 +-
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  | 260 +++++++++++++++++++--
 .../db/metadata/cache/DataNodeSchemaCacheTest.java | 105 +++++++--
 9 files changed, 469 insertions(+), 284 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index 84f79dbeac..116d2af8fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -19,53 +19,36 @@
 
 package org.apache.iotdb.db.metadata.cache;
 
-import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
-import org.apache.iotdb.db.mpp.plan.analyze.FakeSchemaFetcherImpl;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 /**
  * This class takes the responsibility of metadata cache management of all DataRegions under
  * StorageEngine
  */
 public class DataNodeSchemaCache {
-  private static final Logger logger = LoggerFactory.getLogger(DataNodeSchemaCache.class);
-
-  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  private Cache<PartialPath, SchemaCacheEntity> schemaEntityCache;
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  // TODO use fakeSchemaFetcherImpl for test temporarily
-  private static final ISchemaFetcher schemaFetcher = new FakeSchemaFetcherImpl();
+  private final Cache<PartialPath, SchemaCacheEntry> cache;
 
   private DataNodeSchemaCache() {
-    schemaEntityCache =
-        Caffeine.newBuilder().maximumSize(config.getDataNodeSchemaCacheSize()).build();
+    cache = Caffeine.newBuilder().maximumSize(config.getDataNodeSchemaCacheSize()).build();
   }
 
   public static DataNodeSchemaCache getInstance() {
-    return DataNodeSchemaCache.DataNodeSchemaEntryCacheHolder.INSTANCE;
+    return DataNodeSchemaCacheHolder.INSTANCE;
   }
 
   /** singleton pattern. */
-  private static class DataNodeSchemaEntryCacheHolder {
+  private static class DataNodeSchemaCacheHolder {
     private static final DataNodeSchemaCache INSTANCE = new DataNodeSchemaCache();
   }
 
@@ -76,91 +59,41 @@ public class DataNodeSchemaCache {
    * @param measurements
    * @return timeseries partialPath and its SchemaEntity
    */
-  public Map<PartialPath, SchemaCacheEntity> getSchemaEntity(
-      PartialPath devicePath, String[] measurements) {
-    Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap = new HashMap<>();
-    SchemaCacheEntity schemaCacheEntity;
-    List<String> fetchMeasurements = new ArrayList<>();
+  public SchemaTree get(PartialPath devicePath, String[] measurements) {
+    SchemaTree schemaTree = new SchemaTree();
+    SchemaCacheEntry schemaCacheEntry;
     for (String measurement : measurements) {
-      PartialPath path = null;
-      try {
-        path = new PartialPath(devicePath.getFullPath(), measurement);
-      } catch (IllegalPathException e) {
-        logger.error(
-            "Create PartialPath:{} failed.",
-            devicePath.getFullPath() + TsFileConstant.PATH_SEPARATOR + measurement);
+      PartialPath path = devicePath.concatNode(measurement);
+      schemaCacheEntry = cache.getIfPresent(path);
+      if (schemaCacheEntry != null) {
+        schemaTree.appendSingleMeasurement(
+            devicePath.concatNode(
+                schemaCacheEntry.getSchemaEntryId()), // the cached path may be alias path
+            schemaCacheEntry.getMeasurementSchema(),
+            schemaCacheEntry.getAlias(),
+            schemaCacheEntry.isAligned());
       }
-      schemaCacheEntity = schemaEntityCache.getIfPresent(path);
-      if (schemaCacheEntity != null) {
-        schemaCacheEntityMap.put(path, schemaCacheEntity);
-      } else {
-        fetchMeasurements.add(measurement);
-      }
-    }
-    if (fetchMeasurements.size() != 0) {
-      SchemaTree schemaTree;
-      schemaTree = schemaFetcher.fetchSchema(new PathPatternTree(devicePath, fetchMeasurements));
-      // TODO need to construct schemaEntry from schemaTree
-
     }
-    return schemaCacheEntityMap;
+    return schemaTree;
   }
 
-  /**
-   * Get SchemaEntity info with auto create schema
-   *
-   * @param devicePath
-   * @param measurements
-   * @param tsDataTypes
-   * @param isAligned
-   * @return timeseries partialPath and its SchemaEntity
-   */
-  public Map<PartialPath, SchemaCacheEntity> getSchemaEntityWithAutoCreate(
-      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean isAligned) {
-    Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap = new HashMap<>();
-    SchemaCacheEntity schemaCacheEntity;
-    List<String> fetchMeasurements = new ArrayList<>();
-    List<TSDataType> fetchTsDataTypes = new ArrayList<>();
-    for (int i = 0; i < measurements.length; i++) {
-      PartialPath path = null;
-      try {
-        path = new PartialPath(devicePath.getFullPath(), measurements[i]);
-      } catch (IllegalPathException e) {
-        logger.error(
-            "Create PartialPath:{} failed.",
-            devicePath.getFullPath() + TsFileConstant.PATH_SEPARATOR + measurements[i]);
-      }
-      schemaCacheEntity = schemaEntityCache.getIfPresent(path);
-      if (schemaCacheEntity != null) {
-        schemaCacheEntityMap.put(path, schemaCacheEntity);
-      } else {
-        fetchMeasurements.add(measurements[i]);
-        fetchTsDataTypes.add(tsDataTypes[i]);
-      }
-    }
-    if (fetchMeasurements.size() != 0) {
-      SchemaTree schemaTree;
-      schemaTree =
-          schemaFetcher.fetchSchemaWithAutoCreate(
-              devicePath,
-              fetchMeasurements.toArray(new String[fetchMeasurements.size()]),
-              fetchTsDataTypes.toArray(new TSDataType[fetchTsDataTypes.size()]),
-              isAligned);
-      // TODO need to construct schemaEntry from schemaTree
-
-      for (int i = 0; i < fetchMeasurements.size(); i++) {
-        try {
-          PartialPath path = new PartialPath(devicePath.getFullPath(), fetchMeasurements.get(i));
-          SchemaCacheEntity entity =
-              new SchemaCacheEntity(fetchMeasurements.get(i), fetchTsDataTypes.get(i), isAligned);
-          schemaEntityCache.put(path, entity);
-          schemaCacheEntityMap.put(path, entity);
-        } catch (IllegalPathException e) {
-          logger.error("Create PartialPath:{} failed.", devicePath.getFullPath());
-        }
+  public void put(SchemaTree schemaTree) {
+    for (MeasurementPath measurementPath : schemaTree.getAllMeasurement()) {
+      SchemaCacheEntry schemaCacheEntry =
+          new SchemaCacheEntry(
+              (MeasurementSchema) measurementPath.getMeasurementSchema(),
+              measurementPath.isMeasurementAliasExists()
+                  ? measurementPath.getMeasurementAlias()
+                  : null,
+              measurementPath.isUnderAlignedEntity());
+      cache.put(new PartialPath(measurementPath.getNodes()), schemaCacheEntry);
+      if (measurementPath.isMeasurementAliasExists()) {
+        // cache alias path
+        cache.put(
+            measurementPath.getDevicePath().concatNode(measurementPath.getMeasurementAlias()),
+            schemaCacheEntry);
       }
     }
-    return schemaCacheEntityMap;
   }
 
   /**
@@ -170,17 +103,15 @@ public class DataNodeSchemaCache {
    * @return
    */
   public void invalidate(PartialPath partialPath) {
-    schemaEntityCache.invalidate(partialPath);
+    cache.invalidate(partialPath);
   }
 
-  @TestOnly
-  public void cleanUp() {
-    schemaEntityCache.invalidateAll();
-    schemaEntityCache.cleanUp();
+  public long estimatedSize() {
+    return cache.estimatedSize();
   }
 
-  @TestOnly
-  protected Cache<PartialPath, SchemaCacheEntity> getSchemaEntityCache() {
-    return schemaEntityCache;
+  public void cleanUp() {
+    cache.invalidateAll();
+    cache.cleanUp();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntity.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntity.java
deleted file mode 100644
index 0e07b56e3f..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntity.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.metadata.cache;
-
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-
-public class SchemaCacheEntity {
-  private final String schemaEntryId;
-
-  private TSDataType tsDataType;
-
-  private TSEncoding tsEncoding;
-
-  private CompressionType compressionType;
-
-  private String alias;
-
-  private boolean isAligned;
-
-  @TestOnly
-  public SchemaCacheEntity() {
-    this.schemaEntryId = "1";
-  }
-
-  public SchemaCacheEntity(String schemaEntryId, TSDataType tsDataType, boolean isAligned) {
-    this.schemaEntryId = schemaEntryId;
-    this.tsDataType = tsDataType;
-    this.isAligned = isAligned;
-    this.tsEncoding =
-        TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder());
-    this.compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
-    this.alias = "";
-  }
-
-  public SchemaCacheEntity(
-      String schemaEntryId,
-      TSDataType tsDataType,
-      TSEncoding tsEncoding,
-      CompressionType compressionType,
-      String alias,
-      boolean isAligned) {
-    this.schemaEntryId = schemaEntryId;
-    this.tsDataType = tsDataType;
-    this.tsEncoding = tsEncoding;
-    this.compressionType = compressionType;
-    this.alias = alias;
-    this.isAligned = isAligned;
-  }
-
-  public String getSchemaEntryId() {
-    return schemaEntryId;
-  }
-
-  public TSDataType getTsDataType() {
-    return tsDataType;
-  }
-
-  public void setTsDataType(TSDataType tsDataType) {
-    this.tsDataType = tsDataType;
-  }
-
-  public TSEncoding getTsEncoding() {
-    return tsEncoding;
-  }
-
-  public void setTsEncoding(TSEncoding tsEncoding) {
-    this.tsEncoding = tsEncoding;
-  }
-
-  public CompressionType getCompressionType() {
-    return compressionType;
-  }
-
-  public void setCompressionType(CompressionType compressionType) {
-    this.compressionType = compressionType;
-  }
-
-  public String getAlias() {
-    return alias;
-  }
-
-  public void setAlias(String alias) {
-    this.alias = alias;
-  }
-
-  public boolean isAligned() {
-    return isAligned;
-  }
-
-  public void setAligned(boolean aligned) {
-    isAligned = aligned;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
new file mode 100644
index 0000000000..feb8f24eec
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.cache;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+public class SchemaCacheEntry {
+
+  private final MeasurementSchema measurementSchema;
+
+  private final String alias;
+
+  private final boolean isAligned;
+
+  SchemaCacheEntry(MeasurementSchema measurementSchema, String alias, boolean isAligned) {
+    this.measurementSchema = measurementSchema;
+    this.alias = alias;
+    this.isAligned = isAligned;
+  }
+
+  public String getSchemaEntryId() {
+    return measurementSchema.getMeasurementId();
+  }
+
+  public MeasurementSchema getMeasurementSchema() {
+    return measurementSchema;
+  }
+
+  public TSDataType getTsDataType() {
+    return measurementSchema.getType();
+  }
+
+  public String getAlias() {
+    return alias;
+  }
+
+  public boolean isAligned() {
+    return isAligned;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceSchemaInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceSchemaInfo.java
index 3b438114d8..aaa1dc6e59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceSchemaInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/DeviceSchemaInfo.java
@@ -50,7 +50,7 @@ public class DeviceSchemaInfo {
 
   public List<MeasurementSchema> getMeasurementSchemaList() {
     return measurementNodeList.stream()
-        .map(SchemaMeasurementNode::getSchema)
+        .map(measurementNode -> measurementNode == null ? null : measurementNode.getSchema())
         .collect(Collectors.toList());
   }
 
@@ -58,6 +58,9 @@ public class DeviceSchemaInfo {
     return measurementNodeList.stream()
         .map(
             measurementNode -> {
+              if (measurementNode == null) {
+                return null;
+              }
               MeasurementPath measurementPath =
                   new MeasurementPath(
                       devicePath.concatNode(measurementNode.getName()),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
index 05051712eb..f15ee39b68 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
@@ -54,10 +54,10 @@ public class PathPatternTree {
     appendPaths(devicePath, Arrays.asList(measurements));
   }
 
-  public PathPatternTree(PartialPath deivcePath, List<String> measurements) {
+  public PathPatternTree(PartialPath devicePath, List<String> measurements) {
     this.root = new PathPatternNode(SQLConstant.ROOT);
     this.pathList = new ArrayList<>();
-    appendPaths(deivcePath, measurements);
+    appendPaths(devicePath, measurements);
   }
 
   public PathPatternTree(Map<PartialPath, List<String>> deviceToMeasurementsMap) {
@@ -251,4 +251,9 @@ public class PathPatternTree {
     }
     return this.getRoot().equalWith(that.getRoot());
   }
+
+  public boolean isEmpty() {
+    return (root.getChildren() == null || root.getChildren().isEmpty())
+        && (pathList == null || pathList.isEmpty());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index 365c5c098d..e4100e75e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.mpp.common.schematree;
 
-import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
@@ -40,6 +39,7 @@ import java.util.Deque;
 import java.util.List;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
+import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_MATCH_PATTERN;
 import static org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode.SCHEMA_ENTITY_NODE;
 import static org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode.SCHEMA_MEASUREMENT_NODE;
 
@@ -71,14 +71,17 @@ public class SchemaTree {
     return new Pair<>(visitor.getAllResult(), visitor.getNextOffset());
   }
 
+  public List<MeasurementPath> getAllMeasurement() {
+    return searchMeasurementPaths(ALL_MATCH_PATTERN, 0, 0, false).left;
+  }
+
   /**
    * Get all device matching the path pattern.
    *
    * @param pathPattern the pattern of the target devices.
    * @return A HashSet instance which stores info of the devices matching the given path pattern.
    */
-  public List<DeviceSchemaInfo> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
-      throws MetadataException {
+  public List<DeviceSchemaInfo> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch) {
     SchemaTreeDeviceVisitor visitor = new SchemaTreeDeviceVisitor(root, pathPattern, isPrefixMatch);
     return visitor.getAllResult();
   }
@@ -89,12 +92,21 @@ public class SchemaTree {
     String[] nodes = devicePath.getNodes();
     SchemaNode cur = root;
     for (int i = 1; i < nodes.length; i++) {
+      if (cur == null) {
+        return null;
+      }
       cur = cur.getChild(nodes[i]);
     }
 
     List<SchemaMeasurementNode> measurementNodeList = new ArrayList<>();
+    SchemaNode node;
     for (String measurement : measurements) {
-      measurementNodeList.add(cur.getChild(measurement).getAsMeasurementNode());
+      node = cur.getChild(measurement);
+      if (node == null) {
+        measurementNodeList.add(null);
+      } else {
+        measurementNodeList.add(node.getAsMeasurementNode());
+      }
     }
 
     return new DeviceSchemaInfo(devicePath, cur.getAsEntityNode().isAligned(), measurementNodeList);
@@ -107,25 +119,31 @@ public class SchemaTree {
   }
 
   private void appendSingleMeasurementPath(MeasurementPath measurementPath) {
-    String[] nodes = measurementPath.getNodes();
+    appendSingleMeasurement(
+        measurementPath,
+        (MeasurementSchema) measurementPath.getMeasurementSchema(),
+        measurementPath.isMeasurementAliasExists() ? measurementPath.getMeasurementAlias() : null,
+        measurementPath.isUnderAlignedEntity());
+  }
+
+  public void appendSingleMeasurement(
+      PartialPath path, MeasurementSchema schema, String alias, boolean isAligned) {
+    String[] nodes = path.getNodes();
     SchemaNode cur = root;
     SchemaNode child;
     for (int i = 1; i < nodes.length; i++) {
       child = cur.getChild(nodes[i]);
       if (child == null) {
         if (i == nodes.length - 1) {
-          SchemaMeasurementNode measurementNode =
-              new SchemaMeasurementNode(
-                  nodes[i], (MeasurementSchema) measurementPath.getMeasurementSchema());
-          if (measurementPath.isMeasurementAliasExists()) {
-            measurementNode.setAlias(measurementPath.getMeasurementAlias());
-            cur.getAsEntityNode()
-                .addAliasChild(measurementPath.getMeasurementAlias(), measurementNode);
+          SchemaMeasurementNode measurementNode = new SchemaMeasurementNode(nodes[i], schema);
+          if (alias != null) {
+            measurementNode.setAlias(alias);
+            cur.getAsEntityNode().addAliasChild(alias, measurementNode);
           }
           child = measurementNode;
         } else if (i == nodes.length - 2) {
           SchemaEntityNode entityNode = new SchemaEntityNode(nodes[i]);
-          entityNode.setAligned(measurementPath.isUnderAlignedEntity());
+          entityNode.setAligned(isAligned);
           child = entityNode;
         } else {
           child = new SchemaInternalNode(nodes[i]);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaInternalNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaInternalNode.java
index 10af366554..a237fb19e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaInternalNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/node/SchemaInternalNode.java
@@ -28,7 +28,7 @@ import java.util.Map;
 
 public class SchemaInternalNode extends SchemaNode {
 
-  protected Map<String, SchemaNode> children;
+  protected Map<String, SchemaNode> children = new HashMap<>();
 
   public SchemaInternalNode(String name) {
     super(name);
@@ -36,13 +36,10 @@ public class SchemaInternalNode extends SchemaNode {
 
   @Override
   public SchemaNode getChild(String name) {
-    return children == null ? null : children.get(name);
+    return children.get(name);
   }
 
   public void addChild(String name, SchemaNode child) {
-    if (children == null) {
-      children = new HashMap<>();
-    }
     children.put(name, child);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index e9de9a302e..08d8257560 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -22,31 +22,48 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
 public class ClusterSchemaFetcher implements ISchemaFetcher {
 
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
   private final Coordinator coordinator = Coordinator.getInstance();
   private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();
+  private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance();
 
   private static final class ClusterSchemaFetcherHolder {
     private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
@@ -62,7 +79,10 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
   @Override
   public SchemaTree fetchSchema(PathPatternTree patternTree) {
-    SchemaPartition schemaPartition = partitionFetcher.getSchemaPartition(patternTree);
+    return fetchSchema(patternTree, partitionFetcher.getSchemaPartition(patternTree));
+  }
+
+  private SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition) {
     Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
         schemaPartition.getSchemaPartitionMap();
     List<String> storageGroups = new ArrayList<>(schemaPartitionMap.keySet());
@@ -70,6 +90,13 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     SchemaFetchStatement schemaFetchStatement = new SchemaFetchStatement(patternTree);
     schemaFetchStatement.setSchemaPartition(schemaPartition);
 
+    SchemaTree result = executeSchemaFetchQuery(schemaFetchStatement);
+    result.setStorageGroups(storageGroups);
+    return result;
+  }
+
+  private SchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
+
     QueryId queryId =
         new QueryId(String.valueOf(SessionManager.getInstance().requestQueryId(false)));
     ExecutionResult executionResult =
@@ -84,7 +111,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       if (!tsBlock.isPresent()) {
         break;
       }
-      result.setStorageGroups(storageGroups);
+
       Binary binary;
       SchemaTree fetchedSchemaTree;
       Column column = tsBlock.get().getColumn(0);
@@ -101,22 +128,225 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
   @Override
   public SchemaTree fetchSchemaWithAutoCreate(
-      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
-    // todo implement auto create schema
-    return fetchSchema(new PathPatternTree(devicePath, measurements));
+      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean isAligned) {
+
+    SchemaTree schemaTree = schemaCache.get(devicePath, measurements);
+    Pair<List<String>, List<TSDataType>> missingMeasurements =
+        checkMissingMeasurements(schemaTree, devicePath, measurements, tsDataTypes);
+
+    PathPatternTree patternTree =
+        new PathPatternTree(devicePath, missingMeasurements.left.toArray(new String[0]));
+
+    if (patternTree.isEmpty()) {
+      return schemaTree;
+    }
+
+    SchemaTree remoteSchemaTree;
+
+    if (!config.isAutoCreateSchemaEnabled()) {
+      remoteSchemaTree = fetchSchema(patternTree, partitionFetcher.getSchemaPartition(patternTree));
+      schemaTree.mergeSchemaTree(remoteSchemaTree);
+      schemaCache.put(remoteSchemaTree);
+      return schemaTree;
+    }
+
+    remoteSchemaTree =
+        fetchSchema(patternTree, partitionFetcher.getOrCreateSchemaPartition(patternTree));
+    schemaTree.mergeSchemaTree(remoteSchemaTree);
+    schemaCache.put(remoteSchemaTree);
+
+    SchemaTree missingSchemaTree =
+        checkAndAutoCreateMissingMeasurements(
+            remoteSchemaTree,
+            devicePath,
+            missingMeasurements.left.toArray(new String[0]),
+            missingMeasurements.right.toArray(new TSDataType[0]),
+            isAligned);
+
+    schemaTree.mergeSchemaTree(missingSchemaTree);
+    schemaCache.put(missingSchemaTree);
+
+    return schemaTree;
   }
 
   @Override
   public SchemaTree fetchSchemaListWithAutoCreate(
-      List<PartialPath> devicePath,
-      List<String[]> measurements,
-      List<TSDataType[]> tsDataTypes,
-      List<Boolean> aligned) {
-    Map<PartialPath, List<String>> deviceToMeasurementMap = new HashMap<>();
-    for (int i = 0; i < devicePath.size(); i++) {
-      deviceToMeasurementMap.put(devicePath.get(i), Arrays.asList(measurements.get(i)));
+      List<PartialPath> devicePathList,
+      List<String[]> measurementsList,
+      List<TSDataType[]> tsDataTypesList,
+      List<Boolean> isAlignedList) {
+
+    SchemaTree schemaTree = new SchemaTree();
+    PathPatternTree patternTree = new PathPatternTree();
+    for (int i = 0; i < devicePathList.size(); i++) {
+      schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i), measurementsList.get(i)));
+      patternTree.appendPaths(
+          devicePathList.get(i),
+          checkMissingMeasurements(
+                  schemaTree,
+                  devicePathList.get(i),
+                  measurementsList.get(i),
+                  tsDataTypesList.get(i))
+              .left);
+    }
+
+    if (patternTree.isEmpty()) {
+      return schemaTree;
+    }
+
+    SchemaTree remoteSchemaTree;
+
+    if (!config.isAutoCreateSchemaEnabled()) {
+      remoteSchemaTree = fetchSchema(patternTree, partitionFetcher.getSchemaPartition(patternTree));
+      schemaTree.mergeSchemaTree(remoteSchemaTree);
+      schemaCache.put(remoteSchemaTree);
+      return schemaTree;
+    }
+
+    remoteSchemaTree =
+        fetchSchema(patternTree, partitionFetcher.getOrCreateSchemaPartition(patternTree));
+    schemaTree.mergeSchemaTree(remoteSchemaTree);
+    schemaCache.put(remoteSchemaTree);
+
+    SchemaTree missingSchemaTree;
+    for (int i = 0; i < devicePathList.size(); i++) {
+      missingSchemaTree =
+          checkAndAutoCreateMissingMeasurements(
+              schemaTree,
+              devicePathList.get(i),
+              measurementsList.get(i),
+              tsDataTypesList.get(i),
+              isAlignedList.get(i));
+      schemaTree.mergeSchemaTree(missingSchemaTree);
+      schemaCache.put(missingSchemaTree);
+    }
+    return schemaTree;
+  }
+
+  private SchemaTree checkAndAutoCreateMissingMeasurements(
+      SchemaTree schemaTree,
+      PartialPath devicePath,
+      String[] measurements,
+      TSDataType[] tsDataTypes,
+      boolean isAligned) {
+
+    Pair<List<String>, List<TSDataType>> checkResult =
+        checkMissingMeasurements(schemaTree, devicePath, measurements, tsDataTypes);
+
+    List<String> missingMeasurements = checkResult.left;
+    List<TSDataType> dataTypesOfMissingMeasurement = checkResult.right;
+
+    if (missingMeasurements.isEmpty()) {
+      return new SchemaTree();
+    }
+
+    internalCreateTimeseries(
+        devicePath, missingMeasurements, dataTypesOfMissingMeasurement, isAligned);
+
+    SchemaTree reFetchSchemaTree =
+        fetchSchema(new PathPatternTree(devicePath, missingMeasurements));
+
+    Pair<List<String>, List<TSDataType>> recheckResult =
+        checkMissingMeasurements(
+            reFetchSchemaTree,
+            devicePath,
+            missingMeasurements.toArray(new String[0]),
+            dataTypesOfMissingMeasurement.toArray(new TSDataType[0]));
+
+    missingMeasurements = recheckResult.left;
+    if (!missingMeasurements.isEmpty()) {
+      StringBuilder stringBuilder = new StringBuilder();
+      stringBuilder.append("(");
+      for (String missingMeasurement : missingMeasurements) {
+        stringBuilder.append(missingMeasurement).append(" ");
+      }
+      stringBuilder.append(")");
+      throw new RuntimeException(
+          String.format(
+              "Failed to auto create schema, devicePath: %s, measurements: %s",
+              devicePath.getFullPath(), stringBuilder));
+    }
+
+    return reFetchSchemaTree;
+  }
+
+  private Pair<List<String>, List<TSDataType>> checkMissingMeasurements(
+      SchemaTree schemaTree,
+      PartialPath devicePath,
+      String[] measurements,
+      TSDataType[] tsDataTypes) {
+    DeviceSchemaInfo deviceSchemaInfo =
+        schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
+    if (deviceSchemaInfo == null) {
+      return new Pair<>(Arrays.asList(measurements), Arrays.asList(tsDataTypes));
+    }
+
+    List<String> missingMeasurements = new ArrayList<>();
+    List<TSDataType> dataTypesOfMissingMeasurement = new ArrayList<>();
+    List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
+    for (int i = 0; i < measurements.length; i++) {
+      if (schemaList.get(i) == null) {
+        missingMeasurements.add(measurements[i]);
+        dataTypesOfMissingMeasurement.add(tsDataTypes[i]);
+      }
+    }
+
+    return new Pair<>(missingMeasurements, dataTypesOfMissingMeasurement);
+  }
+
+  private void internalCreateTimeseries(
+      PartialPath devicePath,
+      List<String> measurements,
+      List<TSDataType> tsDataTypes,
+      boolean isAligned) {
+
+    if (isAligned) {
+      CreateAlignedTimeSeriesStatement createAlignedTimeSeriesStatement =
+          new CreateAlignedTimeSeriesStatement();
+      createAlignedTimeSeriesStatement.setDevicePath(devicePath);
+      createAlignedTimeSeriesStatement.setMeasurements(measurements);
+      createAlignedTimeSeriesStatement.setDataTypes(tsDataTypes);
+      List<TSEncoding> encodings = new ArrayList<>();
+      List<CompressionType> compressors = new ArrayList<>();
+      for (TSDataType dataType : tsDataTypes) {
+        encodings.add(getDefaultEncoding(dataType));
+        compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
+      }
+      createAlignedTimeSeriesStatement.setEncodings(encodings);
+      createAlignedTimeSeriesStatement.setCompressors(compressors);
+
+      executeCreateStatement(createAlignedTimeSeriesStatement);
+    } else {
+      // todo @zyk implement batch create
+      for (int i = 0; i < measurements.size(); i++) {
+        CreateTimeSeriesStatement createTimeSeriesStatement = new CreateTimeSeriesStatement();
+        createTimeSeriesStatement.setPath(devicePath.concatNode(measurements.get(i)));
+        createTimeSeriesStatement.setDataType(tsDataTypes.get(i));
+        createTimeSeriesStatement.setEncoding(getDefaultEncoding(tsDataTypes.get(i)));
+        createTimeSeriesStatement.setCompressor(
+            TSFileDescriptor.getInstance().getConfig().getCompressor());
+        createTimeSeriesStatement.setProps(Collections.emptyMap());
+
+        executeCreateStatement(createTimeSeriesStatement);
+      }
+    }
+  }
+
+  private void executeCreateStatement(Statement statement) {
+    QueryId queryId =
+        new QueryId(String.valueOf(SessionManager.getInstance().requestQueryId(false)));
+    ExecutionResult executionResult =
+        coordinator.execute(statement, queryId, null, "", partitionFetcher, this);
+    // TODO: throw exception
+    try {
+      int statusCode = executionResult.status.getCode();
+      if (statusCode != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && statusCode != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
+        throw new RuntimeException(
+            "cannot auto create schema, status is: " + executionResult.status);
+      }
+    } finally {
+      coordinator.getQueryExecution(queryId).stopAndCleanup();
     }
-    // todo implement auto create schema
-    return fetchSchema(new PathPatternTree(deviceToMeasurementMap));
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index facec3218f..91f9fbf375 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.metadata.cache;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -29,6 +31,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class DataNodeSchemaCacheTest {
   DataNodeSchemaCache dataNodeSchemaCache;
@@ -50,46 +53,100 @@ public class DataNodeSchemaCacheTest {
     measurements[0] = "s1";
     measurements[1] = "s2";
     measurements[2] = "s3";
-    TSDataType[] tsDataTypes = new TSDataType[3];
-    tsDataTypes[0] = TSDataType.INT32;
-    tsDataTypes[1] = TSDataType.FLOAT;
-    tsDataTypes[2] = TSDataType.BOOLEAN;
-
-    Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap1 =
-        dataNodeSchemaCache.getSchemaEntityWithAutoCreate(
-            device1, measurements, tsDataTypes, false);
+
+    dataNodeSchemaCache.put(generateSchemaTree1());
+
+    Map<PartialPath, SchemaCacheEntry> schemaCacheEntryMap =
+        dataNodeSchemaCache.get(device1, measurements).getAllMeasurement().stream()
+            .collect(
+                Collectors.toMap(
+                    o -> new PartialPath(o.getNodes()),
+                    o ->
+                        new SchemaCacheEntry(
+                            (MeasurementSchema) o.getMeasurementSchema(),
+                            null,
+                            o.isUnderAlignedEntity())));
     Assert.assertEquals(
         TSDataType.INT32,
-        schemaCacheEntityMap1.get(new PartialPath("root.sg1.d1.s1")).getTsDataType());
+        schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s1")).getTsDataType());
     Assert.assertEquals(
         TSDataType.FLOAT,
-        schemaCacheEntityMap1.get(new PartialPath("root.sg1.d1.s2")).getTsDataType());
+        schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s2")).getTsDataType());
     Assert.assertEquals(
         TSDataType.BOOLEAN,
-        schemaCacheEntityMap1.get(new PartialPath("root.sg1.d1.s3")).getTsDataType());
-    Assert.assertEquals(3, dataNodeSchemaCache.getSchemaEntityCache().estimatedSize());
+        schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s3")).getTsDataType());
+    Assert.assertEquals(3, dataNodeSchemaCache.estimatedSize());
 
     String[] otherMeasurements = new String[3];
     otherMeasurements[0] = "s3";
     otherMeasurements[1] = "s4";
     otherMeasurements[2] = "s5";
-    TSDataType[] otherTsDataTypes = new TSDataType[3];
-    otherTsDataTypes[0] = TSDataType.BOOLEAN;
-    otherTsDataTypes[1] = TSDataType.TEXT;
-    otherTsDataTypes[2] = TSDataType.INT64;
-
-    Map<PartialPath, SchemaCacheEntity> schemaCacheEntityMap2 =
-        dataNodeSchemaCache.getSchemaEntityWithAutoCreate(
-            device1, otherMeasurements, otherTsDataTypes, false);
+
+    dataNodeSchemaCache.put(generateSchemaTree2());
+
+    schemaCacheEntryMap =
+        dataNodeSchemaCache.get(device1, otherMeasurements).getAllMeasurement().stream()
+            .collect(
+                Collectors.toMap(
+                    o -> new PartialPath(o.getNodes()),
+                    o ->
+                        new SchemaCacheEntry(
+                            (MeasurementSchema) o.getMeasurementSchema(),
+                            null,
+                            o.isUnderAlignedEntity())));
     Assert.assertEquals(
         TSDataType.BOOLEAN,
-        schemaCacheEntityMap2.get(new PartialPath("root.sg1.d1.s3")).getTsDataType());
+        schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s3")).getTsDataType());
     Assert.assertEquals(
         TSDataType.TEXT,
-        schemaCacheEntityMap2.get(new PartialPath("root.sg1.d1.s4")).getTsDataType());
+        schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s4")).getTsDataType());
     Assert.assertEquals(
         TSDataType.INT64,
-        schemaCacheEntityMap2.get(new PartialPath("root.sg1.d1.s5")).getTsDataType());
-    Assert.assertEquals(5, dataNodeSchemaCache.getSchemaEntityCache().estimatedSize());
+        schemaCacheEntryMap.get(new PartialPath("root.sg1.d1.s5")).getTsDataType());
+    Assert.assertEquals(5, dataNodeSchemaCache.estimatedSize());
+  }
+
+  private SchemaTree generateSchemaTree1() throws IllegalPathException {
+    SchemaTree schemaTree = new SchemaTree();
+
+    schemaTree.appendSingleMeasurement(
+        new PartialPath("root.sg1.d1.s1"),
+        new MeasurementSchema("s1", TSDataType.INT32),
+        null,
+        false);
+    schemaTree.appendSingleMeasurement(
+        new PartialPath("root.sg1.d1.s2"),
+        new MeasurementSchema("s2", TSDataType.FLOAT),
+        null,
+        false);
+    schemaTree.appendSingleMeasurement(
+        new PartialPath("root.sg1.d1.s3"),
+        new MeasurementSchema("s3", TSDataType.BOOLEAN),
+        null,
+        false);
+
+    return schemaTree;
+  }
+
+  private SchemaTree generateSchemaTree2() throws IllegalPathException {
+    SchemaTree schemaTree = new SchemaTree();
+
+    schemaTree.appendSingleMeasurement(
+        new PartialPath("root.sg1.d1.s3"),
+        new MeasurementSchema("s3", TSDataType.BOOLEAN),
+        null,
+        false);
+    schemaTree.appendSingleMeasurement(
+        new PartialPath("root.sg1.d1.s4"),
+        new MeasurementSchema("s4", TSDataType.TEXT),
+        null,
+        false);
+    schemaTree.appendSingleMeasurement(
+        new PartialPath("root.sg1.d1.s5"),
+        new MeasurementSchema("s5", TSDataType.INT64),
+        null,
+        false);
+
+    return schemaTree;
   }
 }