You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/10 11:43:50 UTC

[incubator-iotdb] branch fix_deadlock_0.10 created (now 398dc84)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch fix_deadlock_0.10
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 398dc84  fix deadlock of insert and show latest timeseries

This branch includes the following new commits:

     new 398dc84  fix deadlock of insert and show latest timeseries

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: fix deadlock of insert and show latest timeseries

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_deadlock_0.10
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 398dc84dd0fa8af015eff34e38deda6e0bde1399
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jul 10 19:43:28 2020 +0800

    fix deadlock of insert and show latest timeseries
---
 .../engine/storagegroup/StorageGroupProcessor.java | 44 +++++++---------------
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  2 +
 .../iotdb/db/qp/physical/crud/InsertPlan.java      | 11 ++++++
 .../db/qp/physical/crud/InsertTabletPlan.java      | 11 ++++++
 4 files changed, 38 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b82bcce..eef78cf 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -761,21 +761,13 @@ public class StorageGroupProcessor {
 
   private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime)
       throws WriteProcessException {
-    MNode node = null;
-    try {
-      MManager manager = MManager.getInstance();
-      node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
-      String[] measurementList = plan.getMeasurements();
-      for (int i = 0; i < measurementList.length; i++) {
-        // Update cached last value with high priority
-        ((LeafMNode) manager.getChild(node, measurementList[i]))
-            .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
-      }
-    } catch (MetadataException e) {
-      throw new WriteProcessException(e);
-    } finally {
+    MNode node = plan.getDeviceMNode();
+    String[] measurementList = plan.getMeasurements();
+    for (int i = 0; i < measurementList.length; i++) {
+      // Update cached last value with high priority
       if (node != null) {
-        ((InternalMNode) node).readUnlock();
+        ((LeafMNode) node.getChild(measurementList[i]))
+            .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
       }
     }
   }
@@ -813,24 +805,16 @@ public class StorageGroupProcessor {
 
   private void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime)
       throws WriteProcessException {
-    MNode node = null;
-    try {
-      MManager manager = MManager.getInstance();
-      node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
-      String[] measurementList = plan.getMeasurements();
-      for (int i = 0; i < measurementList.length; i++) {
-        if (plan.getValues()[i] == null) {
-          continue;
-        }
-        // Update cached last value with high priority
-        ((LeafMNode) manager.getChild(node, measurementList[i]))
-            .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
+    MNode node = plan.getDeviceMNode();
+    String[] measurementList = plan.getMeasurements();
+    for (int i = 0; i < measurementList.length; i++) {
+      if (plan.getValues()[i] == null) {
+        continue;
       }
-    } catch (MetadataException e) {
-      throw new WriteProcessException(e);
-    } finally {
+      // Update cached last value with high priority
       if (node != null) {
-        ((InternalMNode) node).readUnlock();
+        ((LeafMNode) node.getChild(measurementList[i]))
+            .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 27c31e6..efa68f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -916,6 +916,7 @@ public class PlanExecutor implements IPlanExecutor {
 
       insertPlan.setMeasurements(measurementList);
       insertPlan.setSchemasAndTransferType(schemas);
+      insertPlan.setDeviceMNode(node);
       StorageEngine.getInstance().insert(insertPlan);
       if (insertPlan.getFailedMeasurements() != null) {
         throw new StorageEngineException(
@@ -1055,6 +1056,7 @@ public class PlanExecutor implements IPlanExecutor {
         measurementList[i] = measurementNode.getName();
       }
       insertTabletPlan.setSchemas(schemas);
+      insertTabletPlan.setDeviceMNode(node);
       return StorageEngine.getInstance().insertTablet(insertTabletPlan);
     } catch (StorageEngineException | MetadataException e) {
       throw new QueryProcessException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index f617a25..611aa31 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -29,6 +29,7 @@ import java.util.Objects;
 import java.util.Set;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -57,6 +58,8 @@ public class InsertPlan extends PhysicalPlan {
   private TSDataType[] types;
   private MeasurementSchema[] schemas;
 
+  private MNode deviceMNode;
+
   // if inferType is false, use the type of values directly
   // if inferType is true, values is String[], and infer types from them
   private boolean inferType = false;
@@ -362,6 +365,14 @@ public class InsertPlan extends PhysicalPlan {
     return failedMeasurements == null ? 0 : failedMeasurements.size();
   }
 
+  public MNode getDeviceMNode() {
+    return deviceMNode;
+  }
+
+  public void setDeviceMNode(MNode deviceMNode) {
+    this.deviceMNode = deviceMNode;
+  }
+
   public TSDataType[] getTypes() {
     return types;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 2928116..97e5781 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -56,6 +57,8 @@ public class InsertTabletPlan extends PhysicalPlan {
   private long[] times; // times should be sorted. It is done in the session API.
   private ByteBuffer timeBuffer;
 
+  private MNode deviceMNode;
+
   private Object[] columns;
   private ByteBuffer valueBuffer;
   private Set<Integer> index;
@@ -356,6 +359,14 @@ public class InsertTabletPlan extends PhysicalPlan {
     this.measurements = measurements;
   }
 
+  public MNode getDeviceMNode() {
+    return deviceMNode;
+  }
+
+  public void setDeviceMNode(MNode deviceMNode) {
+    this.deviceMNode = deviceMNode;
+  }
+
   public TSDataType[] getDataTypes() {
     return dataTypes;
   }