You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/10 11:20:42 UTC

[incubator-iotdb] 01/01: do not get MManager.readLock when inserting

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_deadlock
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 099a1afa8322f65d4e1ff7216fa2e867374d1164
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jul 10 19:20:08 2020 +0800

    do not get MManager.readLock when inserting
---
 .../main/java/org/apache/iotdb/SessionExample.java |  2 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  8 +--
 .../engine/storagegroup/StorageGroupProcessor.java | 62 +++++++---------------
 .../org/apache/iotdb/db/metadata/MManager.java     |  3 ++
 .../iotdb/db/qp/physical/crud/InsertPlan.java      | 12 +++++
 5 files changed, 36 insertions(+), 51 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 3a63fec..06a5f21 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -40,7 +40,7 @@ public class SessionExample {
   private static Session session;
 
   public static void main(String[] args)
-      throws IoTDBConnectionException, StatementExecutionException, BatchExecutionException {
+      throws IoTDBConnectionException, StatementExecutionException {
     session = new Session("127.0.0.1", 6667, "root", "root");
     session.open(false);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 34bd8c0..9249e6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -300,8 +300,6 @@ public class StorageEngine implements IService {
 
   /**
    * insert a InsertTabletPlan to a storage group
-   *
-   * @return result of each row
    */
   public void insertTablet(InsertTabletPlan insertTabletPlan)
       throws StorageEngineException, BatchInsertionException {
@@ -314,11 +312,7 @@ public class StorageEngine implements IService {
     }
 
     // TODO monitor: update statistics
-    try {
-      storageGroupProcessor.insertTablet(insertTabletPlan);
-    } catch (WriteProcessException e) {
-      throw new StorageEngineException(e);
-    }
+    storageGroupProcessor.insertTablet(insertTabletPlan);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a02cbcb..cbd798a 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -632,12 +632,9 @@ public class StorageGroupProcessor {
 
   /**
    * Insert a tablet (rows belonging to the same devices) into this storage group.
-   * @param insertTabletPlan
-   * @throws WriteProcessException when update last cache failed
    * @throws BatchInsertionException if some of the rows failed to be inserted
    */
-  public void insertTablet(InsertTabletPlan insertTabletPlan) throws WriteProcessException,
-      BatchInsertionException {
+  public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertionException {
     writeLock();
     try {
       TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
@@ -780,26 +777,17 @@ public class StorageGroupProcessor {
     return true;
   }
 
-  private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime)
-      throws WriteProcessException {
-    MNode node = null;
-    try {
-      MManager manager = IoTDB.metaManager;
-      node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
-      String[] measurementList = plan.getMeasurements();
-      for (int i = 0; i < measurementList.length; i++) {
-        if (plan.getColumns()[i] == null) {
-          continue;
-        }
-        // Update cached last value with high priority
-        ((MeasurementMNode) manager.getChild(node, measurementList[i]))
-            .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
+  private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime) {
+    MNode node = plan.getDeviceMNode();
+    String[] measurementList = plan.getMeasurements();
+    for (int i = 0; i < measurementList.length; i++) {
+      if (plan.getColumns()[i] == null) {
+        continue;
       }
-    } catch (MetadataException e) {
-      throw new WriteProcessException(e);
-    } finally {
       if (node != null) {
-        node.readUnlock();
+        // Update cached last value with high priority
+        ((MeasurementMNode) node.getChild(measurementList[i]))
+            .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
       }
     }
   }
@@ -835,29 +823,17 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTime)
-      throws WriteProcessException {
-    MNode node = null;
-    try {
-      MManager manager = IoTDB.metaManager;
-      node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
-      String[] measurementList = plan.getMeasurements();
-      for (int i = 0; i < measurementList.length; i++) {
-        if (plan.getValues()[i] == null) {
-          continue;
-        }
-        // Update cached last value with high priority
-        MNode measurementNode = manager.getChild(node, measurementList[i]);
-        if (measurementNode != null) {
-          ((MeasurementMNode) measurementNode)
-              .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
-        }
+  private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTime) {
+    MNode node = plan.getDeviceMNode();
+    String[] measurementList = plan.getMeasurements();
+    for (int i = 0; i < measurementList.length; i++) {
+      if (plan.getValues()[i] == null) {
+        continue;
       }
-    } catch (MetadataException e) {
-      // skip last cache update if the local MTree does not contain the schema
-    } finally {
+      // Update cached last value with high priority
       if (node != null) {
-        node.readUnlock();
+        ((MeasurementMNode) node.getChild(measurementList[i])).
+            updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index c0efdfc..471ab9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -1969,6 +1969,9 @@ public class MManager {
         }
       }
     }
+
+    plan.setDeviceMNode(deviceNode);
+
     return schemas;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 2914779..36a1174 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.crud;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -33,6 +34,9 @@ abstract public class InsertPlan extends PhysicalPlan {
   protected TSDataType[] dataTypes;
   protected MeasurementSchema[] schemas;
 
+  // for updating last cache
+  private MNode deviceMNode;
+
   // record the failed measurements
   protected List<String> failedMeasurements;
 
@@ -81,6 +85,14 @@ abstract public class InsertPlan extends PhysicalPlan {
     return failedMeasurements == null ? 0 : failedMeasurements.size();
   }
 
+  public MNode getDeviceMNode() {
+    return deviceMNode;
+  }
+
+  public void setDeviceMNode(MNode deviceMNode) {
+    this.deviceMNode = deviceMNode;
+  }
+
   /**
    * @param index failed measurement index
    */