You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/06 09:01:08 UTC
[incubator-iotdb] branch master updated: fix concurrent auto create
schema bug (#989)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a63bd82 fix concurrent auto create schema bug (#989)
a63bd82 is described below
commit a63bd82eed524d6927166e65e104501b4756b2bf
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Mon Apr 6 17:01:01 2020 +0800
fix concurrent auto create schema bug (#989)
* fix concurrent auto create schema bug
---
.../java/org/apache/iotdb/db/metadata/MTree.java | 8 +++---
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 29 +++++++++++++++-----
.../org/apache/iotdb/db/service/TSServiceImpl.java | 31 +++++++++++++---------
3 files changed, 44 insertions(+), 24 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index cdb3e6e..b243141 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -101,11 +101,11 @@ public class MTree implements Serializable {
if (cur instanceof LeafMNode) {
throw new PathAlreadyExistException(cur.getFullPath());
}
- MNode leaf = new LeafMNode(cur, nodeNames[nodeNames.length - 1], dataType, encoding,
- compressor, props);
- if (cur.hasChild(leaf.getName())) {
- throw new MetadataException(String.format("The timeseries %s has already existed.", path));
+ String leafName = nodeNames[nodeNames.length - 1];
+ if (cur.hasChild(leafName)) {
+ throw new PathAlreadyExistException(path);
}
+ MNode leaf = new LeafMNode(cur, leafName, dataType, encoding, compressor, props);
cur.addChild(leaf);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index de6bfc7..82a7723 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
@@ -127,9 +128,12 @@ import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PlanExecutor implements IPlanExecutor {
+ private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
// for data query
protected IQueryRouter queryRouter;
// for system schema
@@ -709,10 +713,7 @@ public class PlanExecutor implements IPlanExecutor {
}
TSDataType dataType = TypeInferenceUtils.getPredictedDataType(strValues[i]);
Path path = new Path(deviceId, measurement);
-
- mManager.createTimeseries(path.toString(), dataType, getDefaultEncoding(dataType),
- TSFileDescriptor.getInstance().getConfig().getCompressor(),
- Collections.emptyMap());
+ internalCreateTimeseries(path.toString(), dataType);
}
LeafMNode measurementNode = (LeafMNode) node.getChild(measurement);
schemas[i] = measurementNode.getSchema();
@@ -725,6 +726,22 @@ public class PlanExecutor implements IPlanExecutor {
}
/**
+ * create timeseries with ignore PathAlreadyExistException
+ */
+ private void internalCreateTimeseries(String path, TSDataType dataType) throws MetadataException {
+ try {
+ mManager.createTimeseries(path, dataType, getDefaultEncoding(dataType),
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ } catch (PathAlreadyExistException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Ignore PathAlreadyExistException when Concurrent inserting"
+ + " a non-exist time series {}", path);
+ }
+ }
+ }
+
+ /**
* Get default encoding by dataType
*/
private TSEncoding getDefaultEncoding(TSDataType dataType) {
@@ -768,9 +785,7 @@ public class PlanExecutor implements IPlanExecutor {
}
Path path = new Path(deviceId, measurementList[i]);
TSDataType dataType = dataTypes[i];
- mManager.createTimeseries(path.getFullPath(), dataType, getDefaultEncoding(dataType),
- TSFileDescriptor.getInstance().getConfig().getCompressor(),
- Collections.emptyMap());
+ internalCreateTimeseries(path.getFullPath(), dataType);
}
LeafMNode measurementNode = (LeafMNode) node.getChild(measurementList[i]);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index dc5e2e3..762c26d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1074,22 +1074,27 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSStatus insert(TSInsertReq req) {
- if (!checkLogin(req.getSessionId())) {
- logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
- return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
- }
+ try {
+ if (!checkLogin(req.getSessionId())) {
+ logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+ return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
+ }
- InsertPlan plan = new InsertPlan();
- plan.setDeviceId(req.getDeviceId());
- plan.setTime(req.getTimestamp());
- plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
- plan.setValues(req.getValues().toArray(new String[0]));
+ InsertPlan plan = new InsertPlan();
+ plan.setDeviceId(req.getDeviceId());
+ plan.setTime(req.getTimestamp());
+ plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
+ plan.setValues(req.getValues().toArray(new String[0]));
- TSStatus status = checkAuthority(plan, req.getSessionId());
- if (status != null) {
- return status;
+ TSStatus status = checkAuthority(plan, req.getSessionId());
+ if (status != null) {
+ return status;
+ }
+ return executePlan(plan);
+ } catch (Exception e) {
+ logger.error("meet error when insert", e);
}
- return executePlan(plan);
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
@Override