You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/10 11:43:51 UTC

[incubator-iotdb] 01/01: fix deadlock of insert and show latest timeseries

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_deadlock_0.10
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 398dc84dd0fa8af015eff34e38deda6e0bde1399
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jul 10 19:43:28 2020 +0800

    fix deadlock of insert and show latest timeseries
---
 .../engine/storagegroup/StorageGroupProcessor.java | 44 +++++++---------------
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  2 +
 .../iotdb/db/qp/physical/crud/InsertPlan.java      | 11 ++++++
 .../db/qp/physical/crud/InsertTabletPlan.java      | 11 ++++++
 4 files changed, 38 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b82bcce..eef78cf 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -761,21 +761,13 @@ public class StorageGroupProcessor {
 
   private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime)
       throws WriteProcessException {
-    MNode node = null;
-    try {
-      MManager manager = MManager.getInstance();
-      node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
-      String[] measurementList = plan.getMeasurements();
-      for (int i = 0; i < measurementList.length; i++) {
-        // Update cached last value with high priority
-        ((LeafMNode) manager.getChild(node, measurementList[i]))
-            .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
-      }
-    } catch (MetadataException e) {
-      throw new WriteProcessException(e);
-    } finally {
+    MNode node = plan.getDeviceMNode();
+    String[] measurementList = plan.getMeasurements();
+    for (int i = 0; i < measurementList.length; i++) {
+      // Update cached last value with high priority
       if (node != null) {
-        ((InternalMNode) node).readUnlock();
+        ((LeafMNode) node.getChild(measurementList[i]))
+            .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
       }
     }
   }
@@ -813,24 +805,16 @@ public class StorageGroupProcessor {
 
   private void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime)
       throws WriteProcessException {
-    MNode node = null;
-    try {
-      MManager manager = MManager.getInstance();
-      node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
-      String[] measurementList = plan.getMeasurements();
-      for (int i = 0; i < measurementList.length; i++) {
-        if (plan.getValues()[i] == null) {
-          continue;
-        }
-        // Update cached last value with high priority
-        ((LeafMNode) manager.getChild(node, measurementList[i]))
-            .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
+    MNode node = plan.getDeviceMNode();
+    String[] measurementList = plan.getMeasurements();
+    for (int i = 0; i < measurementList.length; i++) {
+      if (plan.getValues()[i] == null) {
+        continue;
       }
-    } catch (MetadataException e) {
-      throw new WriteProcessException(e);
-    } finally {
+      // Update cached last value with high priority
       if (node != null) {
-        ((InternalMNode) node).readUnlock();
+        ((LeafMNode) node.getChild(measurementList[i]))
+            .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 27c31e6..efa68f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -916,6 +916,7 @@ public class PlanExecutor implements IPlanExecutor {
 
       insertPlan.setMeasurements(measurementList);
       insertPlan.setSchemasAndTransferType(schemas);
+      insertPlan.setDeviceMNode(node);
       StorageEngine.getInstance().insert(insertPlan);
       if (insertPlan.getFailedMeasurements() != null) {
         throw new StorageEngineException(
@@ -1055,6 +1056,7 @@ public class PlanExecutor implements IPlanExecutor {
         measurementList[i] = measurementNode.getName();
       }
       insertTabletPlan.setSchemas(schemas);
+      insertTabletPlan.setDeviceMNode(node);
       return StorageEngine.getInstance().insertTablet(insertTabletPlan);
     } catch (StorageEngineException | MetadataException e) {
       throw new QueryProcessException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index f617a25..611aa31 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -29,6 +29,7 @@ import java.util.Objects;
 import java.util.Set;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -57,6 +58,8 @@ public class InsertPlan extends PhysicalPlan {
   private TSDataType[] types;
   private MeasurementSchema[] schemas;
 
+  private MNode deviceMNode;
+
   // if inferType is false, use the type of values directly
   // if inferType is true, values is String[], and infer types from them
   private boolean inferType = false;
@@ -362,6 +365,14 @@ public class InsertPlan extends PhysicalPlan {
     return failedMeasurements == null ? 0 : failedMeasurements.size();
   }
 
+  public MNode getDeviceMNode() {
+    return deviceMNode;
+  }
+
+  public void setDeviceMNode(MNode deviceMNode) {
+    this.deviceMNode = deviceMNode;
+  }
+
   public TSDataType[] getTypes() {
     return types;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 2928116..97e5781 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -56,6 +57,8 @@ public class InsertTabletPlan extends PhysicalPlan {
   private long[] times; // times should be sorted. It is done in the session API.
   private ByteBuffer timeBuffer;
 
+  private MNode deviceMNode;
+
   private Object[] columns;
   private ByteBuffer valueBuffer;
   private Set<Integer> index;
@@ -356,6 +359,14 @@ public class InsertTabletPlan extends PhysicalPlan {
     this.measurements = measurements;
   }
 
+  public MNode getDeviceMNode() {
+    return deviceMNode;
+  }
+
+  public void setDeviceMNode(MNode deviceMNode) {
+    this.deviceMNode = deviceMNode;
+  }
+
   public TSDataType[] getDataTypes() {
     return dataTypes;
   }