You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/06 09:01:08 UTC

[incubator-iotdb] branch master updated: fix concurrent auto create schema bug (#989)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a63bd82  fix concurrent auto create schema bug (#989)
a63bd82 is described below

commit a63bd82eed524d6927166e65e104501b4756b2bf
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Mon Apr 6 17:01:01 2020 +0800

    fix concurrent auto create schema bug (#989)
    
    * fix concurrent auto create schema bug
---
 .../java/org/apache/iotdb/db/metadata/MTree.java   |  8 +++---
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 29 +++++++++++++++-----
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 31 +++++++++++++---------
 3 files changed, 44 insertions(+), 24 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index cdb3e6e..b243141 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -101,11 +101,11 @@ public class MTree implements Serializable {
     if (cur instanceof LeafMNode) {
       throw new PathAlreadyExistException(cur.getFullPath());
     }
-    MNode leaf = new LeafMNode(cur, nodeNames[nodeNames.length - 1], dataType, encoding,
-        compressor, props);
-    if (cur.hasChild(leaf.getName())) {
-      throw new MetadataException(String.format("The timeseries %s has already existed.", path));
+    String leafName = nodeNames[nodeNames.length - 1];
+    if (cur.hasChild(leafName)) {
+      throw new PathAlreadyExistException(path);
     }
+    MNode leaf = new LeafMNode(cur, leafName, dataType, encoding, compressor, props);
     cur.addChild(leaf);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index de6bfc7..82a7723 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
@@ -127,9 +128,12 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PlanExecutor implements IPlanExecutor {
 
+  private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
   // for data query
   protected IQueryRouter queryRouter;
   // for system schema
@@ -709,10 +713,7 @@ public class PlanExecutor implements IPlanExecutor {
           }
           TSDataType dataType = TypeInferenceUtils.getPredictedDataType(strValues[i]);
           Path path = new Path(deviceId, measurement);
-
-          mManager.createTimeseries(path.toString(), dataType, getDefaultEncoding(dataType),
-                  TSFileDescriptor.getInstance().getConfig().getCompressor(),
-              Collections.emptyMap());
+          internalCreateTimeseries(path.toString(), dataType);
         }
         LeafMNode measurementNode = (LeafMNode) node.getChild(measurement);
         schemas[i] = measurementNode.getSchema();
@@ -725,6 +726,22 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   /**
+   * create timeseries with ignore PathAlreadyExistException
+   */
+  private void internalCreateTimeseries(String path, TSDataType dataType) throws MetadataException {
+    try {
+      mManager.createTimeseries(path, dataType, getDefaultEncoding(dataType),
+          TSFileDescriptor.getInstance().getConfig().getCompressor(),
+          Collections.emptyMap());
+    } catch (PathAlreadyExistException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Ignore PathAlreadyExistException when Concurrent inserting"
+            + " a non-exist time series {}", path);
+      }
+    }
+  }
+
+  /**
    * Get default encoding by dataType
    */
   private TSEncoding getDefaultEncoding(TSDataType dataType) {
@@ -768,9 +785,7 @@ public class PlanExecutor implements IPlanExecutor {
           }
           Path path = new Path(deviceId, measurementList[i]);
           TSDataType dataType = dataTypes[i];
-          mManager.createTimeseries(path.getFullPath(), dataType, getDefaultEncoding(dataType),
-                  TSFileDescriptor.getInstance().getConfig().getCompressor(),
-                  Collections.emptyMap());
+          internalCreateTimeseries(path.getFullPath(), dataType);
         }
         LeafMNode measurementNode = (LeafMNode) node.getChild(measurementList[i]);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index dc5e2e3..762c26d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1074,22 +1074,27 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
   @Override
   public TSStatus insert(TSInsertReq req) {
-    if (!checkLogin(req.getSessionId())) {
-      logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
-      return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
-    }
+    try {
+      if (!checkLogin(req.getSessionId())) {
+        logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+        return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
+      }
 
-    InsertPlan plan = new InsertPlan();
-    plan.setDeviceId(req.getDeviceId());
-    plan.setTime(req.getTimestamp());
-    plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
-    plan.setValues(req.getValues().toArray(new String[0]));
+      InsertPlan plan = new InsertPlan();
+      plan.setDeviceId(req.getDeviceId());
+      plan.setTime(req.getTimestamp());
+      plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
+      plan.setValues(req.getValues().toArray(new String[0]));
 
-    TSStatus status = checkAuthority(plan, req.getSessionId());
-    if (status != null) {
-      return status;
+      TSStatus status = checkAuthority(plan, req.getSessionId());
+      if (status != null) {
+        return status;
+      }
+      return executePlan(plan);
+    } catch (Exception e) {
+      logger.error("meet error when insert", e);
     }
-    return executePlan(plan);
+    return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
   }
 
   @Override