You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/25 08:25:59 UTC
[incubator-iotdb] branch master updated: [IOTDB-125] Potential
Concurrency bug while deleting and inserting happen together (#1088)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e2656a2 [IOTDB-125] Potential Concurrency bug while deleting and inserting happen together (#1088)
e2656a2 is described below
commit e2656a29dc10e4c44c4c3ed4148c6b14a7f27ae6
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Sat Apr 25 16:25:47 2020 +0800
[IOTDB-125] Potential Concurrency bug while deleting and inserting happen together (#1088)
* add a lock in each MNode
* delete duplicated finally block and retain the reconnect() method
* add try catch in TSServiceImpl
---
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 12 +--
.../engine/storagegroup/StorageGroupProcessor.java | 57 +++++++-------
.../exception/metadata/DeleteFailedException.java | 38 +++++++++
.../org/apache/iotdb/db/metadata/MManager.java | 90 ++++++++++++++--------
.../iotdb/db/metadata/mnode/InternalMNode.java | 59 ++++++++++++--
.../org/apache/iotdb/db/metadata/mnode/MNode.java | 8 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 67 +++++++++++-----
.../org/apache/iotdb/db/service/TSServiceImpl.java | 26 ++++++-
.../apache/iotdb/db/integration/IoTDBLastIT.java | 6 +-
.../iotdb/db/metadata/MManagerImproveTest.java | 24 +++---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
11 files changed, 273 insertions(+), 115 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 278d1f9..cdd9a82 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -174,17 +174,11 @@ public class IoTDBStatement implements Statement {
return executeSQL(sql);
} catch (TException e) {
if (reConnect()) {
- try {
- return executeSQL(sql);
- } catch (TException e2) {
- throw new SQLException(
- String.format("Fail to execute %s after reconnecting. please check server status",
- sql), e2);
- }
+ throw new SQLException(String.format("Fail to execute %s", sql), e);
} else {
throw new SQLException(String
- .format("Fail to reconnect to server when executing %s. please check server status",
- sql), e);
+ .format("Fail to reconnect to server when executing %s. please check server status",
+ sql), e);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index c3a6495..803e147 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,27 +18,6 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -61,16 +40,12 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
-import org.apache.iotdb.db.exception.LoadFileException;
-import org.apache.iotdb.db.exception.MergeException;
-import org.apache.iotdb.db.exception.StorageGroupProcessorException;
-import org.apache.iotdb.db.exception.TsFileProcessorException;
-import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.*;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.mnode.InternalMNode;
import org.apache.iotdb.db.metadata.mnode.LeafMNode;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
@@ -95,6 +70,17 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
/**
* For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
* TsFileProcessor in the working status. <br/>
@@ -625,9 +611,9 @@ public class StorageGroupProcessor {
public void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime)
throws WriteProcessException {
+ MNode node = null;
try {
- MNode node =
- MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(plan.getDeviceId());
+ node = MManager.getInstance().getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
String[] measurementList = plan.getMeasurements();
for (int i = 0; i < measurementList.length; i++) {
// Update cached last value with high priority
@@ -637,6 +623,10 @@ public class StorageGroupProcessor {
}
} catch (MetadataException e) {
throw new WriteProcessException(e);
+ } finally {
+ if (node != null) {
+ ((InternalMNode) node).readUnlock();
+ }
}
}
@@ -673,18 +663,23 @@ public class StorageGroupProcessor {
public void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime)
throws WriteProcessException {
+ MNode node = null;
try {
- MNode node =
- MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(plan.getDeviceId());
+ node = MManager.getInstance().getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
String[] measurementList = plan.getMeasurements();
for (int i = 0; i < measurementList.length; i++) {
// Update cached last value with high priority
MNode measurementNode = node.getChild(measurementList[i]);
+
((LeafMNode) measurementNode)
.updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
}
} catch (MetadataException | QueryProcessException e) {
throw new WriteProcessException(e);
+ } finally {
+ if (node != null) {
+ ((InternalMNode) node).readUnlock();
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/metadata/DeleteFailedException.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/DeleteFailedException.java
new file mode 100644
index 0000000..a4e46e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/DeleteFailedException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class DeleteFailedException extends MetadataException {
+
+ private String name;
+
+ public DeleteFailedException(String name) {
+ super(String.format("Node [%s] is being used. Deletion failed.", name),
+ TSStatusCode.NODE_DELETE_FAILED_ERROR.getStatusCode());
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 356992a..53f6f10 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -25,11 +25,7 @@ import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.metadata.*;
import org.apache.iotdb.db.metadata.mnode.InternalMNode;
import org.apache.iotdb.db.metadata.mnode.LeafMNode;
import org.apache.iotdb.db.metadata.mnode.MNode;
@@ -254,9 +250,13 @@ public class MManager {
createTimeseries(plan, offset);
break;
case MetadataOperationType.DELETE_TIMESERIES:
- for (String deleteStorageGroup : deleteTimeseries(args[1])) {
+ Pair<Set<String>, String> pair = deleteTimeseries(args[1]);
+ for (String deleteStorageGroup : pair.left) {
StorageEngine.getInstance().deleteAllDataFilesInOneStorageGroup(deleteStorageGroup);
}
+ if (!pair.right.isEmpty()) {
+ throw new DeleteFailedException(pair.right);
+ }
break;
case MetadataOperationType.SET_STORAGE_GROUP:
setStorageGroup(args[1]);
@@ -366,10 +366,11 @@ public class MManager {
* Delete all timeseries under the given path, may cross different storage group
*
* @param prefixPath path to be deleted, could be root or a prefix path or a full path
- * @return a set contains StorageGroups that contain no more timeseries after this deletion and
- * files of such StorageGroups should be deleted to reclaim disk space.
+ * @return 1. The Set contains StorageGroups that contain no more timeseries after this deletion
+ * files of such StorageGroups should be deleted to reclaim disk space.
+ * 2. The String is the deletion failed Timeseries
*/
- public Set<String> deleteTimeseries(String prefixPath) throws MetadataException {
+ public Pair<Set<String>, String> deleteTimeseries(String prefixPath) throws MetadataException {
lock.writeLock().lock();
if (isStorageGroup(prefixPath)) {
@@ -391,13 +392,18 @@ public class MManager {
// Monitor storage group seriesPath is not allowed to be deleted
allTimeseries.removeIf(p -> p.startsWith(MonitorConstants.STAT_STORAGE_GROUP_PREFIX));
+ Set<String> failedNames = new HashSet<>();
for (String p : allTimeseries) {
- String emptyStorageGroup = deleteOneTimeseriesAndUpdateStatisticsAndLog(p);
- if (emptyStorageGroup != null) {
- emptyStorageGroups.add(emptyStorageGroup);
+ try {
+ String emptyStorageGroup = deleteOneTimeseriesAndUpdateStatisticsAndLog(p);
+ if (emptyStorageGroup != null) {
+ emptyStorageGroups.add(emptyStorageGroup);
+ }
+ } catch (DeleteFailedException e) {
+ failedNames.add(e.getName());
}
}
- return emptyStorageGroups;
+ return new Pair<>(emptyStorageGroups, String.join(",", failedNames));
} catch (IOException e) {
throw new MetadataException(e.getMessage());
} finally {
@@ -857,44 +863,62 @@ public class MManager {
/**
* get device node, if the storage group is not set, create it when autoCreateSchema is true
*
+ * !!!!!!Attention!!!!!
+ * must call the return node's readUnlock() if you call this method.
* @param path path
*/
- public MNode getDeviceNodeWithAutoCreateStorageGroup(String path, boolean autoCreateSchema,
+ public MNode getDeviceNodeWithAutoCreateAndReadLock(String path, boolean autoCreateSchema,
int sgLevel) throws MetadataException {
lock.readLock().lock();
MNode node = null;
- boolean shouldSetStorageGroup = false;
+ boolean shouldSetStorageGroup;
try {
node = mNodeCache.get(path);
+ return node;
} catch (CacheException e) {
if (!autoCreateSchema) {
throw new PathNotExistException(path);
- } else {
- shouldSetStorageGroup = e.getCause() instanceof StorageGroupNotSetException;
}
} finally {
+ if (node != null) {
+ ((InternalMNode) node).readLock();
+ }
lock.readLock().unlock();
- lock.writeLock().lock();
+ }
+
+ lock.writeLock().lock();
+ try {
try {
- if (autoCreateSchema) {
- if (shouldSetStorageGroup) {
- String storageGroupName = MetaUtils.getStorageGroupNameByLevel(path, sgLevel);
- setStorageGroup(storageGroupName);
- }
- node = mtree.getDeviceNodeWithAutoCreating(path);
- }
- } catch (StorageGroupAlreadySetException e) {
- // ignore set storage group concurrently
- node = mtree.getDeviceNodeWithAutoCreating(path);
- } finally {
- lock.writeLock().unlock();
+ node = mNodeCache.get(path);
+ return node;
+ } catch (CacheException e) {
+ shouldSetStorageGroup = e.getCause() instanceof StorageGroupNotSetException;
+ }
+
+ if (shouldSetStorageGroup) {
+ String storageGroupName = MetaUtils.getStorageGroupNameByLevel(path, sgLevel);
+ setStorageGroup(storageGroupName);
}
+ node = mtree.getDeviceNodeWithAutoCreating(path);
+ return node;
+ } catch (StorageGroupAlreadySetException e) {
+ // ignore set storage group concurrently
+ node = mtree.getDeviceNodeWithAutoCreating(path);
+ return node;
+ } finally {
+ if (node != null) {
+ ((InternalMNode) node).readLock();
+ }
+ lock.writeLock().unlock();
}
- return node;
}
- public MNode getDeviceNodeWithAutoCreateStorageGroup(String path) throws MetadataException {
- return getDeviceNodeWithAutoCreateStorageGroup(path, config.isAutoCreateSchemaEnabled(),
+ /**
+ * !!!!!!Attention!!!!!
+ * must call the return node's readUnlock() if you call this method.
+ */
+ public MNode getDeviceNodeWithAutoCreateAndReadLock(String path) throws MetadataException {
+ return getDeviceNodeWithAutoCreateAndReadLock(path, config.isAutoCreateSchemaEnabled(),
config.getDefaultStorageGroupLevel());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/InternalMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/InternalMNode.java
index 4e35729..2d199cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/InternalMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/InternalMNode.java
@@ -18,8 +18,15 @@
*/
package org.apache.iotdb.db.metadata.mnode;
+import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
+
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
public class InternalMNode extends MNode {
@@ -28,6 +35,8 @@ public class InternalMNode extends MNode {
private Map<String, MNode> children;
private Map<String, MNode> aliasChildren;
+ protected ReadWriteLock lock = new ReentrantReadWriteLock();
+
public InternalMNode(MNode parent, String name) {
super(parent, name);
this.children = new LinkedHashMap<>();
@@ -45,14 +54,38 @@ public class InternalMNode extends MNode {
}
+ /**
+ * If delete a leafMNode, lock its parent, if delete an InternalNode, lock itself
+ */
@Override
- public void deleteChild(String name) {
- children.remove(name);
- }
+ public void deleteChild(String name) throws DeleteFailedException {
+ if (children.containsKey(name)) {
+ Lock writeLock;
+ // if its child node is leaf node, we need to acquire the write lock of the current device node
+ if (children.get(name) instanceof LeafMNode) {
+ writeLock = lock.writeLock();
+ } else {
+ // otherwise, we only need to acquire the write lock of its child node.
+ writeLock = ((InternalMNode) children.get(name)).lock.writeLock();
+ }
+ if (writeLock.tryLock()) {
+ children.remove(name);
+ writeLock.unlock();
+ } else {
+ throw new DeleteFailedException(getFullPath() + PATH_SEPARATOR + name);
+ }
+ }
+}
@Override
- public void deleteAliasChild(String alias) {
- aliasChildren.remove(alias);
+ public void deleteAliasChild(String alias) throws DeleteFailedException {
+
+ if (lock.writeLock().tryLock()) {
+ aliasChildren.remove(alias);
+ lock.writeLock().unlock();
+ } else {
+ throw new DeleteFailedException(getFullPath() + PATH_SEPARATOR + alias);
+ }
}
@Override
@@ -78,4 +111,20 @@ public class InternalMNode extends MNode {
public Map<String, MNode> getChildren() {
return children;
}
+
+ public void readLock() {
+ InternalMNode node = this;
+ while (node != null) {
+ node.lock.readLock().lock();
+ node = (InternalMNode) node.parent;
+ }
+ }
+
+ public void readUnlock() {
+ InternalMNode node = this;
+ while (node != null) {
+ node.lock.readLock().unlock();
+ node = (InternalMNode) node.parent;
+ }
+ }
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
index ff44cbf..d65f575 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.metadata.mnode;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import java.io.Serializable;
import java.util.Map;
@@ -36,13 +37,14 @@ public abstract class MNode implements Serializable {
*/
private String name;
- private MNode parent;
+ protected MNode parent;
/**
* from root to this node, only be set when used once
*/
protected String fullPath;
+
/**
* Constructor of MNode.
*/
@@ -64,12 +66,12 @@ public abstract class MNode implements Serializable {
/**
* delete a child
*/
- public abstract void deleteChild(String name);
+ public abstract void deleteChild(String name) throws DeleteFailedException;
/**
* delete the alias of a child
*/
- public abstract void deleteAliasChild(String alias);
+ public abstract void deleteAliasChild(String alias) throws DeleteFailedException;
/**
* get the child with the name
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 908d3e5..57d1950 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
@@ -78,9 +79,11 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.io.IOException;
import java.util.*;
+
import static org.apache.iotdb.db.conf.IoTDBConstant.*;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
@@ -619,24 +622,31 @@ public class PlanExecutor implements IPlanExecutor {
Set<Path> registeredSeries = new HashSet<>();
for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
String device = chunkGroupMetadata.getDevice();
- MNode node = mManager.getDeviceNodeWithAutoCreateStorageGroup(device, true, sgLevel);
- for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
- Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
- if (!registeredSeries.contains(series)) {
- registeredSeries.add(series);
- MeasurementSchema schema = knownSchemas.get(series);
- if (schema == null) {
- throw new MetadataException(String.format("Can not get the schema of measurement [%s]",
- chunkMetadata.getMeasurementUid()));
- }
- if (!node.hasChild(chunkMetadata.getMeasurementUid())) {
- mManager.createTimeseries(series.getFullPath(), schema.getType(),
- schema.getEncodingType(), schema.getCompressor(), Collections.emptyMap());
- } else if (node.getChild(chunkMetadata.getMeasurementUid()) instanceof InternalMNode) {
- throw new QueryProcessException(
- String.format("Current Path is not leaf node. %s", series));
+ MNode node = null;
+ try {
+ node = mManager.getDeviceNodeWithAutoCreateAndReadLock(device, true, sgLevel);
+ for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+ Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ if (!registeredSeries.contains(series)) {
+ registeredSeries.add(series);
+ MeasurementSchema schema = knownSchemas.get(series);
+ if (schema == null) {
+ throw new MetadataException(String.format("Can not get the schema of measurement [%s]",
+ chunkMetadata.getMeasurementUid()));
+ }
+ if (!node.hasChild(chunkMetadata.getMeasurementUid())) {
+ mManager.createTimeseries(series.getFullPath(), schema.getType(),
+ schema.getEncodingType(), schema.getCompressor(), Collections.emptyMap());
+ } else if (node.getChild(chunkMetadata.getMeasurementUid()) instanceof InternalMNode) {
+ throw new QueryProcessException(
+ String.format("Current Path is not leaf node. %s", series));
+ }
}
}
+ } finally {
+ if (node != null) {
+ ((InternalMNode) node).readUnlock();
+ }
}
}
}
@@ -705,10 +715,11 @@ public class PlanExecutor implements IPlanExecutor {
@Override
public void insert(InsertPlan insertPlan) throws QueryProcessException {
+ MNode node = null;
try {
String[] measurementList = insertPlan.getMeasurements();
String deviceId = insertPlan.getDeviceId();
- MNode node = mManager.getDeviceNodeWithAutoCreateStorageGroup(deviceId);
+ node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
String[] strValues = insertPlan.getValues();
MeasurementSchema[] schemas = new MeasurementSchema[measurementList.length];
@@ -725,10 +736,15 @@ public class PlanExecutor implements IPlanExecutor {
LeafMNode measurementNode = (LeafMNode) node.getChild(measurement);
schemas[i] = measurementNode.getSchema();
}
+
insertPlan.setSchemas(schemas);
StorageEngine.getInstance().insert(insertPlan);
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
+ } finally {
+ if (node != null) {
+ ((InternalMNode) node).readUnlock();
+ }
}
}
@@ -774,10 +790,11 @@ public class PlanExecutor implements IPlanExecutor {
@Override
public TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
+ MNode node = null;
try {
String[] measurementList = insertTabletPlan.getMeasurements();
String deviceId = insertTabletPlan.getDeviceId();
- MNode node = mManager.getDeviceNodeWithAutoCreateStorageGroup(deviceId);
+ node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
TSDataType[] dataTypes = insertTabletPlan.getDataTypes();
IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
MeasurementSchema[] schemas = new MeasurementSchema[measurementList.length];
@@ -809,6 +826,10 @@ public class PlanExecutor implements IPlanExecutor {
return StorageEngine.getInstance().insertTablet(insertTabletPlan);
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
+ } finally {
+ if (node != null) {
+ ((InternalMNode) node).readUnlock();
+ }
}
}
@@ -900,12 +921,20 @@ public class PlanExecutor implements IPlanExecutor {
try {
deleteDataOfTimeSeries(deletePathList);
Set<String> emptyStorageGroups = new HashSet<>();
+ List<String> failedNames = new LinkedList<>();
for (Path path : deletePathList) {
- emptyStorageGroups.addAll(mManager.deleteTimeseries(path.toString()));
+ Pair<Set<String>, String> pair = mManager.deleteTimeseries(path.toString());
+ emptyStorageGroups.addAll(pair.left);
+ if (!pair.right.isEmpty()) {
+ failedNames.add(pair.right);
+ }
}
for (String deleteStorageGroup : emptyStorageGroups) {
StorageEngine.getInstance().deleteAllDataFilesInOneStorageGroup(deleteStorageGroup);
}
+ if (!failedNames.isEmpty()) {
+ throw new DeleteFailedException(String.join(",", failedNames));
+ }
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 575e10c..f8a36c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -55,7 +55,6 @@ import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -305,6 +304,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
status = RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
resp.setStatus(status);
return resp;
+ } catch (Exception e) {
+ logger.error("Error in fetchMetadata : ", e);
+ status = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+ resp.setStatus(status);
+ return resp;
}
resp.setStatus(status);
return resp;
@@ -398,6 +402,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} else {
return RpcUtils.getTSBatchExecuteStatementResp(result);
}
+ } catch (Exception e) {
+ logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ return RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_JDBC_BATCH, t1);
}
@@ -441,6 +448,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
result.add(
RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED, "query statement not allowed: " + statement));
return false;
+ } catch (Exception e) {
+ logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ result.add(RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "server Internal Error: " + e.getMessage()));
}
return true;
}
@@ -482,6 +493,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
logger.info(ERROR_PARSING_SQL, e.getMessage());
return RpcUtils.getTSExecuteStatementResp(
RpcUtils.getStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR, e.getMessage()));
+ } catch (Exception e) {
+ logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ return RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
@@ -519,6 +533,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
logger.error("check metadata error: ", e);
return RpcUtils.getTSExecuteStatementResp(
TSStatusCode.METADATA_ERROR, "Check metadata error: " + e.getMessage());
+ } catch (Exception e) {
+ logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ return RpcUtils.getTSExecuteStatementResp(
+ RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
}
}
@@ -1382,6 +1400,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} catch (AuthException e) {
logger.error("meet error while checking authorization.", e);
return RpcUtils.getStatus(TSStatusCode.UNINITIALIZED_AUTH_ERROR, e.getMessage());
+ } catch (Exception e) {
+ logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
return null;
}
@@ -1393,6 +1414,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} catch (QueryProcessException e) {
logger.debug("meet error while processing non-query. ", e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ } catch (Exception e) {
+ logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
return execRet
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java
index c1b4108..490c5af 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java
@@ -191,8 +191,7 @@ public class IoTDBLastIT {
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
- MNode node = MManager.getInstance()
- .getDeviceNodeWithAutoCreateStorageGroup("root.ln.wf01.wt02.temperature");
+ MNode node = MManager.getInstance().getNodeByPath("root.ln.wf01.wt02.temperature");
((LeafMNode) node).resetCache();
boolean hasResultSet =
statement.execute(
@@ -243,8 +242,7 @@ public class IoTDBLastIT {
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
- MNode node = MManager.getInstance()
- .getDeviceNodeWithAutoCreateStorageGroup("root.ln.wf01.wt03.temperature");
+ MNode node = MManager.getInstance().getNodeByPath("root.ln.wf01.wt03.temperature");
((LeafMNode) node).resetCache();
statement.execute("INSERT INTO root.ln.wf01.wt03(timestamp,status, id) values(500, false, 9)");
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java
index 1103719..3aaac3a 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java
@@ -22,18 +22,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.metadata.mnode.InternalMNode;
import org.apache.iotdb.db.metadata.mnode.LeafMNode;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.exception.cache.CacheException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.junit.After;
@@ -134,12 +131,19 @@ public class MManagerImproveTest {
}
private void doCacheTest(String deviceId, List<String> measurementList) throws MetadataException {
- MNode node = mManager.getDeviceNodeWithAutoCreateStorageGroup(deviceId);
- for (String s : measurementList) {
- assertTrue(node.hasChild(s));
- LeafMNode measurementNode = (LeafMNode) node.getChild(s);
- TSDataType dataType = measurementNode.getSchema().getType();
- assertEquals(TSDataType.TEXT, dataType);
+ MNode node = null;
+ try {
+ node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
+ for (String s : measurementList) {
+ assertTrue(node.hasChild(s));
+ LeafMNode measurementNode = (LeafMNode) node.getChild(s);
+ TSDataType dataType = measurementNode.getSchema().getType();
+ assertEquals(TSDataType.TEXT, dataType);
+ }
+ } finally {
+ if (node != null) {
+ ((InternalMNode) node).readUnlock();
+ }
}
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 93ef57b..4d11096 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -24,6 +24,7 @@ public enum TSStatusCode {
STILL_EXECUTING_STATUS(201),
INVALID_HANDLE_STATUS(202),
+ NODE_DELETE_FAILED_ERROR(298),
ALIAS_ALREADY_EXIST_ERROR(299),
PATH_ALREADY_EXIST_ERROR(300),
PATH_NOT_EXIST_ERROR(301),