You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/10/04 12:24:37 UTC
[iotdb] branch master updated: Fix concurrent timeseries auto creation exception (#7513)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6a3b8ed077 Fix concurrent timeseries auto creation exception (#7513)
6a3b8ed077 is described below
commit 6a3b8ed0778e0c3f62a546e977d8984edf6a3e5a
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Tue Oct 4 20:24:32 2022 +0800
Fix concurrent timeseries auto creation exception (#7513)
Fix concurrent timeseries auto creation exception (#7513)
---
.../schemaregion/SchemaRegionMemoryImpl.java | 144 +++++++++------------
1 file changed, 58 insertions(+), 86 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index c0283af12b..4b02c508fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -31,10 +31,10 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine;
-import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
+import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
@@ -673,46 +673,6 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
}
}
- /**
- * Add one timeseries to metadata tree, if the timeseries already exists, throw exception
- *
- * @param path the timeseries path
- * @param dataType the dateType {@code DataType} of the timeseries
- * @param encoding the encoding function {@code Encoding} of the timeseries
- * @param compressor the compressor function {@code Compressor} of the time series
- */
- private void createTimeseries(
- PartialPath path,
- TSDataType dataType,
- TSEncoding encoding,
- CompressionType compressor,
- Map<String, String> props)
- throws MetadataException {
- try {
- createTimeseries(
- new CreateTimeSeriesPlan(path, dataType, encoding, compressor, props, null, null, null));
- } catch (PathAlreadyExistException | AliasAlreadyExistException e) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Ignore PathAlreadyExistException and AliasAlreadyExistException when Concurrent inserting"
- + " a non-exist time series {}",
- path);
- }
- }
- }
-
- public void createAlignedTimeSeries(
- PartialPath prefixPath,
- List<String> measurements,
- List<TSDataType> dataTypes,
- List<TSEncoding> encodings,
- List<CompressionType> compressors)
- throws MetadataException {
- createAlignedTimeSeries(
- new CreateAlignedTimeSeriesPlan(
- prefixPath, measurements, dataTypes, encodings, compressors, null, null, null));
- }
-
/**
* create aligned timeseries
*
@@ -1822,10 +1782,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
if (!plan.isAligned()) {
internalCreateTimeseries(devicePath.concatNode(measurement), plan.getDataTypes()[loc]);
} else {
- internalAlignedCreateTimeseries(
- devicePath,
- Collections.singletonList(measurement),
- Collections.singletonList(plan.getDataTypes()[loc]));
+ internalAlignedCreateTimeseries(devicePath, measurement, plan.getDataTypes()[loc]);
}
// after creating timeseries, the deviceMNode has been replaced by a new entityMNode
deviceMNode = mtree.getNodeByPath(devicePath);
@@ -1866,49 +1823,69 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
/** create timeseries ignoring PathAlreadyExistException */
private void internalCreateTimeseries(PartialPath path, TSDataType dataType)
throws MetadataException {
- createTimeseries(
+ internalCreateTimeseries(
path,
dataType,
getDefaultEncoding(dataType),
- TSFileDescriptor.getInstance().getConfig().getCompressor(),
- Collections.emptyMap());
+ TSFileDescriptor.getInstance().getConfig().getCompressor());
}
- /** create timeseries ignoring PathAlreadyExistException */
+ /** create timeseries ignoring MeasurementAlreadyExistException */
private void internalCreateTimeseries(
PartialPath path, TSDataType dataType, TSEncoding encoding, CompressionType compressor)
throws MetadataException {
- if (encoding == null) {
- encoding = getDefaultEncoding(dataType);
- }
- if (compressor == null) {
- compressor = TSFileDescriptor.getInstance().getConfig().getCompressor();
+ try {
+ createTimeseries(
+ new CreateTimeSeriesPlan(
+ path, dataType, encoding, compressor, Collections.emptyMap(), null, null, null));
+ } catch (MeasurementAlreadyExistException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Ignore MeasurementAlreadyExistException when concurrent inserting"
+ + " a non-exist time series {}",
+ path);
+ }
}
- createTimeseries(path, dataType, encoding, compressor, Collections.emptyMap());
}
- /** create aligned timeseries ignoring PathAlreadyExistException */
+ /** create aligned timeseries ignoring MeasurementAlreadyExistException */
private void internalAlignedCreateTimeseries(
- PartialPath prefixPath,
- List<String> measurements,
- List<TSDataType> dataTypes,
- List<TSEncoding> encodings,
- List<CompressionType> compressors)
+ PartialPath devicePath,
+ String measurement,
+ TSDataType dataType,
+ TSEncoding encoding,
+ CompressionType compressor)
throws MetadataException {
- createAlignedTimeSeries(prefixPath, measurements, dataTypes, encodings, compressors);
+ try {
+ createAlignedTimeSeries(
+ new CreateAlignedTimeSeriesPlan(
+ devicePath,
+ Collections.singletonList(measurement),
+ Collections.singletonList(dataType),
+ Collections.singletonList(encoding),
+ Collections.singletonList(compressor),
+ null,
+ null,
+ null));
+ } catch (MeasurementAlreadyExistException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Ignore MeasurementAlreadyExistException when concurrent inserting"
+ + " a non-exist time series {}",
+ e.getMeasurementPath());
+ }
+ }
}
- /** create aligned timeseries ignoring PathAlreadyExistException */
+ /** create aligned timeseries ignoring MeasurementAlreadyExistException */
private void internalAlignedCreateTimeseries(
- PartialPath prefixPath, List<String> measurements, List<TSDataType> dataTypes)
- throws MetadataException {
- List<TSEncoding> encodings = new ArrayList<>();
- List<CompressionType> compressors = new ArrayList<>();
- for (TSDataType dataType : dataTypes) {
- encodings.add(getDefaultEncoding(dataType));
- compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
- }
- createAlignedTimeSeries(prefixPath, measurements, dataTypes, encodings, compressors);
+ PartialPath devicePath, String measurement, TSDataType dataType) throws MetadataException {
+ internalAlignedCreateTimeseries(
+ devicePath,
+ measurement,
+ dataType,
+ getDefaultEncoding(dataType),
+ TSFileDescriptor.getInstance().getConfig().getCompressor());
}
@Override
@@ -1928,24 +1905,19 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
measurementMNode = getMeasurementMNode(deviceMNode, measurements[i]);
if (measurementMNode == null) {
if (config.isAutoCreateSchemaEnabled()) {
+ TSDataType dataType = getDataType.apply(i);
+ TSEncoding encoding =
+ encodings[i] == null ? getDefaultEncoding(dataType) : encodings[i];
+ CompressionType compressionType =
+ compressionTypes[i] == null
+ ? TSFileDescriptor.getInstance().getConfig().getCompressor()
+ : compressionTypes[i];
if (aligned) {
- TSDataType dataType = getDataType.apply(i);
internalAlignedCreateTimeseries(
- devicePath,
- Collections.singletonList(measurements[i]),
- Collections.singletonList(dataType),
- Collections.singletonList(
- encodings[i] == null ? getDefaultEncoding(dataType) : encodings[i]),
- Collections.singletonList(
- compressionTypes[i] == null
- ? TSFileDescriptor.getInstance().getConfig().getCompressor()
- : compressionTypes[i]));
+ devicePath, measurements[i], dataType, encoding, compressionType);
} else {
internalCreateTimeseries(
- devicePath.concatNode(measurements[i]),
- getDataType.apply(i),
- encodings[i],
- compressionTypes[i]);
+ devicePath.concatNode(measurements[i]), dataType, encoding, compressionType);
}
// after creating timeseries, the deviceMNode has been replaced by a new entityMNode
deviceMNode = mtree.getNodeByPath(devicePath);