You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/04/22 10:45:58 UTC
[incubator-iotdb] 03/03: add a lock in each MNode
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch DeleteInsertBug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 34eac9a9cda9333a596b9a93386b6311107bc752
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Wed Apr 22 18:45:31 2020 +0800
add a lock in each MNode
---
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 12 +---
.../engine/storagegroup/StorageGroupProcessor.java | 56 +++++++---------
.../exception/metadata/DeleteFailedException.java | 38 +++++++++++
.../org/apache/iotdb/db/metadata/MManager.java | 76 ++++++++++++++--------
.../iotdb/db/metadata/mnode/InternalMNode.java | 36 ++++++++--
.../org/apache/iotdb/db/metadata/mnode/MNode.java | 25 ++++++-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 72 +++++++++++++-------
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
8 files changed, 216 insertions(+), 100 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 278d1f9..b54cbbe 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -173,19 +173,9 @@ public class IoTDBStatement implements Statement {
try {
return executeSQL(sql);
} catch (TException e) {
- if (reConnect()) {
- try {
- return executeSQL(sql);
- } catch (TException e2) {
- throw new SQLException(
- String.format("Fail to execute %s after reconnecting. please check server status",
- sql), e2);
- }
- } else {
- throw new SQLException(String
+ throw new SQLException(String
.format("Fail to reconnect to server when executing %s. please check server status",
sql), e);
- }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1ac1e85..f5b0f15 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,27 +18,6 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -61,12 +40,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
-import org.apache.iotdb.db.exception.LoadFileException;
-import org.apache.iotdb.db.exception.MergeException;
-import org.apache.iotdb.db.exception.StorageGroupProcessorException;
-import org.apache.iotdb.db.exception.TsFileProcessorException;
-import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.*;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -95,6 +69,17 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
/**
* For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
* TsFileProcessor in the working status. <br/>
@@ -625,9 +610,9 @@ public class StorageGroupProcessor {
public void tryToUpdateBatchInsertLastCache(BatchInsertPlan plan, Long latestFlushedTime)
throws WriteProcessException {
+ MNode node = null;
try {
- MNode node =
- MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(plan.getDeviceId());
+ node = MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(plan.getDeviceId());
String[] measurementList = plan.getMeasurements();
for (int i = 0; i < measurementList.length; i++) {
// Update cached last value with high priority
@@ -637,6 +622,10 @@ public class StorageGroupProcessor {
}
} catch (MetadataException e) {
throw new WriteProcessException(e);
+ } finally {
+ if (node != null) {
+ node.readUnlock();
+ }
}
}
@@ -673,18 +662,23 @@ public class StorageGroupProcessor {
public void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime)
throws WriteProcessException {
+ MNode node = null;
try {
- MNode node =
- MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(plan.getDeviceId());
+ node = MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(plan.getDeviceId());
String[] measurementList = plan.getMeasurements();
for (int i = 0; i < measurementList.length; i++) {
// Update cached last value with high priority
MNode measurementNode = node.getChild(measurementList[i]);
+
((LeafMNode) measurementNode)
.updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
}
} catch (MetadataException | QueryProcessException e) {
throw new WriteProcessException(e);
+ } finally {
+ if (node != null) {
+ node.readUnlock();
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/metadata/DeleteFailedException.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/DeleteFailedException.java
new file mode 100644
index 0000000..a4e46e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/DeleteFailedException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class DeleteFailedException extends MetadataException {
+
+ private String name;
+
+ public DeleteFailedException(String name) {
+ super(String.format("Node [%s] is being used. Deletion failed.", name),
+ TSStatusCode.NODE_DELETE_FAILED_ERROR.getStatusCode());
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 110e3d2..f5854e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -25,11 +25,7 @@ import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.metadata.*;
import org.apache.iotdb.db.metadata.mnode.InternalMNode;
import org.apache.iotdb.db.metadata.mnode.LeafMNode;
import org.apache.iotdb.db.metadata.mnode.MNode;
@@ -254,9 +250,13 @@ public class MManager {
createTimeseries(plan, offset);
break;
case MetadataOperationType.DELETE_TIMESERIES:
- for (String deleteStorageGroup : deleteTimeseries(args[1])) {
+ Pair<Set<String>, String> pair = deleteTimeseries(args[1]);
+ for (String deleteStorageGroup : pair.left) {
StorageEngine.getInstance().deleteAllDataFilesInOneStorageGroup(deleteStorageGroup);
}
+ if (!pair.right.isEmpty()) {
+ throw new DeleteFailedException(pair.right);
+ }
break;
case MetadataOperationType.SET_STORAGE_GROUP:
setStorageGroup(args[1]);
@@ -369,7 +369,7 @@ public class MManager {
* @return a set contains StorageGroups that contain no more timeseries after this deletion and
* files of such StorageGroups should be deleted to reclaim disk space.
*/
- public Set<String> deleteTimeseries(String prefixPath) throws MetadataException {
+ public Pair<Set<String>, String> deleteTimeseries(String prefixPath) throws MetadataException {
lock.writeLock().lock();
if (isStorageGroup(prefixPath)) {
@@ -391,13 +391,18 @@ public class MManager {
// Monitor storage group seriesPath is not allowed to be deleted
allTimeseries.removeIf(p -> p.startsWith(MonitorConstants.STAT_STORAGE_GROUP_PREFIX));
+ Set<String> failedNames = new HashSet<>();
for (String p : allTimeseries) {
- String emptyStorageGroup = deleteOneTimeseriesAndUpdateStatisticsAndLog(p);
- if (emptyStorageGroup != null) {
- emptyStorageGroups.add(emptyStorageGroup);
+ try {
+ String emptyStorageGroup = deleteOneTimeseriesAndUpdateStatisticsAndLog(p);
+ if (emptyStorageGroup != null) {
+ emptyStorageGroups.add(emptyStorageGroup);
+ }
+ } catch (DeleteFailedException e) {
+ failedNames.add(e.getName());
}
}
- return emptyStorageGroups;
+ return new Pair<>(emptyStorageGroups, String.join(",", failedNames));
} catch (IOException e) {
throw new MetadataException(e.getMessage());
} finally {
@@ -843,40 +848,59 @@ public class MManager {
/**
* get device node, if the storage group is not set, create it when autoCreateSchema is true
*
+ * !!!!!!Attention!!!!!
+ * must can the return node's readUnlock() if you call this method.
* @param path path
*/
public MNode getDeviceNodeWithAutoCreateStorageGroup(String path, boolean autoCreateSchema,
int sgLevel) throws MetadataException {
lock.readLock().lock();
MNode node = null;
- boolean shouldSetStorageGroup = false;
+ boolean shouldSetStorageGroup;
try {
node = mNodeCache.get(path);
+ return node;
} catch (CacheException e) {
if (!autoCreateSchema) {
throw new PathNotExistException(path);
- } else {
- shouldSetStorageGroup = e.getCause() instanceof StorageGroupNotSetException;
}
} finally {
+ if (node != null) {
+ node.readLock();
+ }
lock.readLock().unlock();
- lock.writeLock().lock();
+ }
+
+ lock.writeLock().lock();
+ try {
try {
- if (autoCreateSchema) {
- if (shouldSetStorageGroup) {
- String storageGroupName = MetaUtils.getStorageGroupNameByLevel(path, sgLevel);
- setStorageGroup(storageGroupName);
- }
- node = mtree.getDeviceNodeWithAutoCreating(path);
- }
- } catch (StorageGroupAlreadySetException e) {
- // ignore set storage group concurrently
- node = mtree.getDeviceNodeWithAutoCreating(path);
+ node = mNodeCache.get(path);
+ return node;
+ } catch (CacheException e) {
+ shouldSetStorageGroup = e.getCause() instanceof StorageGroupNotSetException;
} finally {
+ if (node != null) {
+ node.readLock();
+ }
lock.writeLock().unlock();
}
+
+ if (shouldSetStorageGroup) {
+ String storageGroupName = MetaUtils.getStorageGroupNameByLevel(path, sgLevel);
+ setStorageGroup(storageGroupName);
+ }
+ node = mtree.getDeviceNodeWithAutoCreating(path);
+ return node;
+ } catch (StorageGroupAlreadySetException e) {
+ // ignore set storage group concurrently
+ node = mtree.getDeviceNodeWithAutoCreating(path);
+ return node;
+ } finally {
+ if (node != null) {
+ node.readLock();
+ }
+ lock.writeLock().unlock();
}
- return node;
}
public MNode getDeviceNodeWithAutoCreateStorageGroup(String path) throws MetadataException {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/InternalMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/InternalMNode.java
index 4e35729..a19c46c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/InternalMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/InternalMNode.java
@@ -18,8 +18,13 @@
*/
package org.apache.iotdb.db.metadata.mnode;
+import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
+
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
public class InternalMNode extends MNode {
@@ -46,13 +51,34 @@ public class InternalMNode extends MNode {
@Override
- public void deleteChild(String name) {
- children.remove(name);
- }
+ public void deleteChild(String name) throws DeleteFailedException {
+ if (children.containsKey(name)) {
+ Lock writeLock;
+ // if its child node is leaf node, we need to acquire the write lock of the current device node
+ if (children.get(name) instanceof LeafMNode) {
+ writeLock = lock.writeLock();
+ } else {
+ // otherwise, we only need to acquire the write lock of its child node.
+ writeLock = children.get(name).lock.writeLock();
+ }
+ if (writeLock.tryLock()) {
+ children.remove(name);
+ writeLock.unlock();
+ } else {
+ throw new DeleteFailedException(getFullPath() + PATH_SEPARATOR + name);
+ }
+ }
+}
@Override
- public void deleteAliasChild(String alias) {
- aliasChildren.remove(alias);
+ public void deleteAliasChild(String alias) throws DeleteFailedException {
+
+ if (lock.writeLock().tryLock()) {
+ aliasChildren.remove(alias);
+ lock.writeLock().unlock();
+ } else {
+ throw new DeleteFailedException(getFullPath() + PATH_SEPARATOR + alias);
+ }
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
index ff44cbf..e1f6fce 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
@@ -19,9 +19,12 @@
package org.apache.iotdb.db.metadata.mnode;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import java.io.Serializable;
import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This class is the implementation of Metadata Node. One MNode instance represents one node in the
@@ -43,6 +46,8 @@ public abstract class MNode implements Serializable {
*/
protected String fullPath;
+ protected ReadWriteLock lock = new ReentrantReadWriteLock();
+
/**
* Constructor of MNode.
*/
@@ -64,12 +69,12 @@ public abstract class MNode implements Serializable {
/**
* delete a child
*/
- public abstract void deleteChild(String name);
+ public abstract void deleteChild(String name) throws DeleteFailedException;
/**
* delete the alias of a child
*/
- public abstract void deleteAliasChild(String alias);
+ public abstract void deleteAliasChild(String alias) throws DeleteFailedException;
/**
* get the child with the name
@@ -121,4 +126,20 @@ public abstract class MNode implements Serializable {
public void setName(String name) {
this.name = name;
}
+
+ public void readLock() {
+ MNode node = this;
+ while (node != null) {
+ node.lock.readLock().lock();
+ node = node.parent;
+ }
+ }
+
+ public void readUnlock() {
+ MNode node = this;
+ while (node != null) {
+ node.lock.readLock().unlock();
+ node = node.parent;
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 0c0cb57..b6c7d96 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
@@ -78,10 +79,10 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.conf.IoTDBConstant.*;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
@@ -621,24 +622,31 @@ public class PlanExecutor implements IPlanExecutor {
Set<Path> registeredSeries = new HashSet<>();
for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
String device = chunkGroupMetadata.getDevice();
- MNode node = mManager.getDeviceNodeWithAutoCreateStorageGroup(device, true, sgLevel);
- for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
- Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
- if (!registeredSeries.contains(series)) {
- registeredSeries.add(series);
- MeasurementSchema schema = knownSchemas.get(series);
- if (schema == null) {
- throw new MetadataException(String.format("Can not get the schema of measurement [%s]",
- chunkMetadata.getMeasurementUid()));
- }
- if (!node.hasChild(chunkMetadata.getMeasurementUid())) {
- mManager.createTimeseries(series.getFullPath(), schema.getType(),
- schema.getEncodingType(), schema.getCompressor(), Collections.emptyMap());
- } else if (node.getChild(chunkMetadata.getMeasurementUid()) instanceof InternalMNode) {
- throw new QueryProcessException(
- String.format("Current Path is not leaf node. %s", series));
+ MNode node = null;
+ try {
+ node = mManager.getDeviceNodeWithAutoCreateStorageGroup(device, true, sgLevel);
+ for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+ Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ if (!registeredSeries.contains(series)) {
+ registeredSeries.add(series);
+ MeasurementSchema schema = knownSchemas.get(series);
+ if (schema == null) {
+ throw new MetadataException(String.format("Can not get the schema of measurement [%s]",
+ chunkMetadata.getMeasurementUid()));
+ }
+ if (!node.hasChild(chunkMetadata.getMeasurementUid())) {
+ mManager.createTimeseries(series.getFullPath(), schema.getType(),
+ schema.getEncodingType(), schema.getCompressor(), Collections.emptyMap());
+ } else if (node.getChild(chunkMetadata.getMeasurementUid()) instanceof InternalMNode) {
+ throw new QueryProcessException(
+ String.format("Current Path is not leaf node. %s", series));
+ }
}
}
+ } finally {
+ if (node != null) {
+ node.readUnlock();
+ }
}
}
}
@@ -707,10 +715,11 @@ public class PlanExecutor implements IPlanExecutor {
@Override
public void insert(InsertPlan insertPlan) throws QueryProcessException {
+ MNode node = null;
try {
String[] measurementList = insertPlan.getMeasurements();
String deviceId = insertPlan.getDeviceId();
- MNode node = mManager.getDeviceNodeWithAutoCreateStorageGroup(deviceId);
+ node = mManager.getDeviceNodeWithAutoCreateStorageGroup(deviceId);
String[] strValues = insertPlan.getValues();
MeasurementSchema[] schemas = new MeasurementSchema[measurementList.length];
@@ -727,15 +736,15 @@ public class PlanExecutor implements IPlanExecutor {
LeafMNode measurementNode = (LeafMNode) node.getChild(measurement);
schemas[i] = measurementNode.getSchema();
}
- try {
- TimeUnit.SECONDS.sleep(4);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+
insertPlan.setSchemas(schemas);
StorageEngine.getInstance().insert(insertPlan);
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
+ } finally {
+ if (node != null) {
+ node.readUnlock();
+ }
}
}
@@ -781,10 +790,11 @@ public class PlanExecutor implements IPlanExecutor {
@Override
public TSStatus[] insertBatch(BatchInsertPlan batchInsertPlan) throws QueryProcessException {
+ MNode node = null;
try {
String[] measurementList = batchInsertPlan.getMeasurements();
String deviceId = batchInsertPlan.getDeviceId();
- MNode node = mManager.getDeviceNodeWithAutoCreateStorageGroup(deviceId);
+ node = mManager.getDeviceNodeWithAutoCreateStorageGroup(deviceId);
TSDataType[] dataTypes = batchInsertPlan.getDataTypes();
IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
MeasurementSchema[] schemas = new MeasurementSchema[measurementList.length];
@@ -816,6 +826,10 @@ public class PlanExecutor implements IPlanExecutor {
return StorageEngine.getInstance().insertBatch(batchInsertPlan);
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
+ } finally {
+ if (node != null) {
+ node.readUnlock();
+ }
}
}
@@ -907,12 +921,20 @@ public class PlanExecutor implements IPlanExecutor {
try {
deleteDataOfTimeSeries(deletePathList);
Set<String> emptyStorageGroups = new HashSet<>();
+ List<String> failedNames = new LinkedList<>();
for (Path path : deletePathList) {
- emptyStorageGroups.addAll(mManager.deleteTimeseries(path.toString()));
+ Pair<Set<String>, String> pair = mManager.deleteTimeseries(path.toString());
+ emptyStorageGroups.addAll(pair.left);
+ if (!pair.right.isEmpty()) {
+ failedNames.add(pair.right);
+ }
}
for (String deleteStorageGroup : emptyStorageGroups) {
StorageEngine.getInstance().deleteAllDataFilesInOneStorageGroup(deleteStorageGroup);
}
+ if (!failedNames.isEmpty()) {
+ throw new DeleteFailedException(String.join(",", failedNames));
+ }
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 93ef57b..4d11096 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -24,6 +24,7 @@ public enum TSStatusCode {
STILL_EXECUTING_STATUS(201),
INVALID_HANDLE_STATUS(202),
+ NODE_DELETE_FAILED_ERROR(298),
ALIAS_ALREADY_EXIST_ERROR(299),
PATH_ALREADY_EXIST_ERROR(300),
PATH_NOT_EXIST_ERROR(301),