You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/10/04 12:24:37 UTC

[iotdb] branch master updated: Fix concurrent timeseries auto creation exception (#7513)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a3b8ed077 Fix concurrent timeseries auto creation exception (#7513)
6a3b8ed077 is described below

commit 6a3b8ed0778e0c3f62a546e977d8984edf6a3e5a
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Tue Oct 4 20:24:32 2022 +0800

    Fix concurrent timeseries auto creation exception (#7513)
    
    Fix concurrent timeseries auto creation exception (#7513)
---
 .../schemaregion/SchemaRegionMemoryImpl.java       | 144 +++++++++------------
 1 file changed, 58 insertions(+), 86 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index c0283af12b..4b02c508fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -31,10 +31,10 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine;
-import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
+import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
@@ -673,46 +673,6 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     }
   }
 
-  /**
-   * Add one timeseries to metadata tree, if the timeseries already exists, throw exception
-   *
-   * @param path the timeseries path
-   * @param dataType the dateType {@code DataType} of the timeseries
-   * @param encoding the encoding function {@code Encoding} of the timeseries
-   * @param compressor the compressor function {@code Compressor} of the time series
-   */
-  private void createTimeseries(
-      PartialPath path,
-      TSDataType dataType,
-      TSEncoding encoding,
-      CompressionType compressor,
-      Map<String, String> props)
-      throws MetadataException {
-    try {
-      createTimeseries(
-          new CreateTimeSeriesPlan(path, dataType, encoding, compressor, props, null, null, null));
-    } catch (PathAlreadyExistException | AliasAlreadyExistException e) {
-      if (logger.isDebugEnabled()) {
-        logger.debug(
-            "Ignore PathAlreadyExistException and AliasAlreadyExistException when Concurrent inserting"
-                + " a non-exist time series {}",
-            path);
-      }
-    }
-  }
-
-  public void createAlignedTimeSeries(
-      PartialPath prefixPath,
-      List<String> measurements,
-      List<TSDataType> dataTypes,
-      List<TSEncoding> encodings,
-      List<CompressionType> compressors)
-      throws MetadataException {
-    createAlignedTimeSeries(
-        new CreateAlignedTimeSeriesPlan(
-            prefixPath, measurements, dataTypes, encodings, compressors, null, null, null));
-  }
-
   /**
    * create aligned timeseries
    *
@@ -1822,10 +1782,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
           if (!plan.isAligned()) {
             internalCreateTimeseries(devicePath.concatNode(measurement), plan.getDataTypes()[loc]);
           } else {
-            internalAlignedCreateTimeseries(
-                devicePath,
-                Collections.singletonList(measurement),
-                Collections.singletonList(plan.getDataTypes()[loc]));
+            internalAlignedCreateTimeseries(devicePath, measurement, plan.getDataTypes()[loc]);
           }
           // after creating timeseries, the deviceMNode has been replaced by a new entityMNode
           deviceMNode = mtree.getNodeByPath(devicePath);
@@ -1866,49 +1823,69 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   /** create timeseries ignoring PathAlreadyExistException */
   private void internalCreateTimeseries(PartialPath path, TSDataType dataType)
       throws MetadataException {
-    createTimeseries(
+    internalCreateTimeseries(
         path,
         dataType,
         getDefaultEncoding(dataType),
-        TSFileDescriptor.getInstance().getConfig().getCompressor(),
-        Collections.emptyMap());
+        TSFileDescriptor.getInstance().getConfig().getCompressor());
   }
 
-  /** create timeseries ignoring PathAlreadyExistException */
+  /** create timeseries ignoring MeasurementAlreadyExistException */
   private void internalCreateTimeseries(
       PartialPath path, TSDataType dataType, TSEncoding encoding, CompressionType compressor)
       throws MetadataException {
-    if (encoding == null) {
-      encoding = getDefaultEncoding(dataType);
-    }
-    if (compressor == null) {
-      compressor = TSFileDescriptor.getInstance().getConfig().getCompressor();
+    try {
+      createTimeseries(
+          new CreateTimeSeriesPlan(
+              path, dataType, encoding, compressor, Collections.emptyMap(), null, null, null));
+    } catch (MeasurementAlreadyExistException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Ignore MeasurementAlreadyExistException when concurrent inserting"
+                + " a non-exist time series {}",
+            path);
+      }
     }
-    createTimeseries(path, dataType, encoding, compressor, Collections.emptyMap());
   }
 
-  /** create aligned timeseries ignoring PathAlreadyExistException */
+  /** create aligned timeseries ignoring MeasurementAlreadyExistException */
   private void internalAlignedCreateTimeseries(
-      PartialPath prefixPath,
-      List<String> measurements,
-      List<TSDataType> dataTypes,
-      List<TSEncoding> encodings,
-      List<CompressionType> compressors)
+      PartialPath devicePath,
+      String measurement,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor)
       throws MetadataException {
-    createAlignedTimeSeries(prefixPath, measurements, dataTypes, encodings, compressors);
+    try {
+      createAlignedTimeSeries(
+          new CreateAlignedTimeSeriesPlan(
+              devicePath,
+              Collections.singletonList(measurement),
+              Collections.singletonList(dataType),
+              Collections.singletonList(encoding),
+              Collections.singletonList(compressor),
+              null,
+              null,
+              null));
+    } catch (MeasurementAlreadyExistException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Ignore MeasurementAlreadyExistException when concurrent inserting"
+                + " a non-exist time series {}",
+            e.getMeasurementPath());
+      }
+    }
   }
 
-  /** create aligned timeseries ignoring PathAlreadyExistException */
+  /** create aligned timeseries ignoring MeasurementAlreadyExistException */
   private void internalAlignedCreateTimeseries(
-      PartialPath prefixPath, List<String> measurements, List<TSDataType> dataTypes)
-      throws MetadataException {
-    List<TSEncoding> encodings = new ArrayList<>();
-    List<CompressionType> compressors = new ArrayList<>();
-    for (TSDataType dataType : dataTypes) {
-      encodings.add(getDefaultEncoding(dataType));
-      compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
-    }
-    createAlignedTimeSeries(prefixPath, measurements, dataTypes, encodings, compressors);
+      PartialPath devicePath, String measurement, TSDataType dataType) throws MetadataException {
+    internalAlignedCreateTimeseries(
+        devicePath,
+        measurement,
+        dataType,
+        getDefaultEncoding(dataType),
+        TSFileDescriptor.getInstance().getConfig().getCompressor());
   }
 
   @Override
@@ -1928,24 +1905,19 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
         measurementMNode = getMeasurementMNode(deviceMNode, measurements[i]);
         if (measurementMNode == null) {
           if (config.isAutoCreateSchemaEnabled()) {
+            TSDataType dataType = getDataType.apply(i);
+            TSEncoding encoding =
+                encodings[i] == null ? getDefaultEncoding(dataType) : encodings[i];
+            CompressionType compressionType =
+                compressionTypes[i] == null
+                    ? TSFileDescriptor.getInstance().getConfig().getCompressor()
+                    : compressionTypes[i];
             if (aligned) {
-              TSDataType dataType = getDataType.apply(i);
               internalAlignedCreateTimeseries(
-                  devicePath,
-                  Collections.singletonList(measurements[i]),
-                  Collections.singletonList(dataType),
-                  Collections.singletonList(
-                      encodings[i] == null ? getDefaultEncoding(dataType) : encodings[i]),
-                  Collections.singletonList(
-                      compressionTypes[i] == null
-                          ? TSFileDescriptor.getInstance().getConfig().getCompressor()
-                          : compressionTypes[i]));
+                  devicePath, measurements[i], dataType, encoding, compressionType);
             } else {
               internalCreateTimeseries(
-                  devicePath.concatNode(measurements[i]),
-                  getDataType.apply(i),
-                  encodings[i],
-                  compressionTypes[i]);
+                  devicePath.concatNode(measurements[i]), dataType, encoding, compressionType);
             }
             // after creating timeseries, the deviceMNode has been replaced by a new entityMNode
             deviceMNode = mtree.getNodeByPath(devicePath);