You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/10 11:20:42 UTC
[incubator-iotdb] 01/01: do not get MManager.readLock when inserting
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_deadlock
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 099a1afa8322f65d4e1ff7216fa2e867374d1164
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jul 10 19:20:08 2020 +0800
do not get MManager.readLock when inserting
---
.../main/java/org/apache/iotdb/SessionExample.java | 2 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 8 +--
.../engine/storagegroup/StorageGroupProcessor.java | 62 +++++++---------------
.../org/apache/iotdb/db/metadata/MManager.java | 3 ++
.../iotdb/db/qp/physical/crud/InsertPlan.java | 12 +++++
5 files changed, 36 insertions(+), 51 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 3a63fec..06a5f21 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -40,7 +40,7 @@ public class SessionExample {
private static Session session;
public static void main(String[] args)
- throws IoTDBConnectionException, StatementExecutionException, BatchExecutionException {
+ throws IoTDBConnectionException, StatementExecutionException {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open(false);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 34bd8c0..9249e6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -300,8 +300,6 @@ public class StorageEngine implements IService {
/**
* insert a InsertTabletPlan to a storage group
- *
- * @return result of each row
*/
public void insertTablet(InsertTabletPlan insertTabletPlan)
throws StorageEngineException, BatchInsertionException {
@@ -314,11 +312,7 @@ public class StorageEngine implements IService {
}
// TODO monitor: update statistics
- try {
- storageGroupProcessor.insertTablet(insertTabletPlan);
- } catch (WriteProcessException e) {
- throw new StorageEngineException(e);
- }
+ storageGroupProcessor.insertTablet(insertTabletPlan);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a02cbcb..cbd798a 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -632,12 +632,9 @@ public class StorageGroupProcessor {
/**
* Insert a tablet (rows belonging to the same devices) into this storage group.
- * @param insertTabletPlan
- * @throws WriteProcessException when update last cache failed
* @throws BatchInsertionException if some of the rows failed to be inserted
*/
- public void insertTablet(InsertTabletPlan insertTabletPlan) throws WriteProcessException,
- BatchInsertionException {
+ public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertionException {
writeLock();
try {
TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
@@ -780,26 +777,17 @@ public class StorageGroupProcessor {
return true;
}
- private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime)
- throws WriteProcessException {
- MNode node = null;
- try {
- MManager manager = IoTDB.metaManager;
- node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
- String[] measurementList = plan.getMeasurements();
- for (int i = 0; i < measurementList.length; i++) {
- if (plan.getColumns()[i] == null) {
- continue;
- }
- // Update cached last value with high priority
- ((MeasurementMNode) manager.getChild(node, measurementList[i]))
- .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
+ private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime) {
+ MNode node = plan.getDeviceMNode();
+ String[] measurementList = plan.getMeasurements();
+ for (int i = 0; i < measurementList.length; i++) {
+ if (plan.getColumns()[i] == null) {
+ continue;
}
- } catch (MetadataException e) {
- throw new WriteProcessException(e);
- } finally {
if (node != null) {
- node.readUnlock();
+ // Update cached last value with high priority
+ ((MeasurementMNode) node.getChild(measurementList[i]))
+ .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
}
}
}
@@ -835,29 +823,17 @@ public class StorageGroupProcessor {
}
}
- private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTime)
- throws WriteProcessException {
- MNode node = null;
- try {
- MManager manager = IoTDB.metaManager;
- node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
- String[] measurementList = plan.getMeasurements();
- for (int i = 0; i < measurementList.length; i++) {
- if (plan.getValues()[i] == null) {
- continue;
- }
- // Update cached last value with high priority
- MNode measurementNode = manager.getChild(node, measurementList[i]);
- if (measurementNode != null) {
- ((MeasurementMNode) measurementNode)
- .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
- }
+ private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTime) {
+ MNode node = plan.getDeviceMNode();
+ String[] measurementList = plan.getMeasurements();
+ for (int i = 0; i < measurementList.length; i++) {
+ if (plan.getValues()[i] == null) {
+ continue;
}
- } catch (MetadataException e) {
- // skip last cache update if the local MTree does not contain the schema
- } finally {
+ // Update cached last value with high priority
if (node != null) {
- node.readUnlock();
+ ((MeasurementMNode) node.getChild(measurementList[i])).
+ updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index c0efdfc..471ab9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -1969,6 +1969,9 @@ public class MManager {
}
}
}
+
+ plan.setDeviceMNode(deviceNode);
+
return schemas;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 2914779..36a1174 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.crud;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -33,6 +34,9 @@ abstract public class InsertPlan extends PhysicalPlan {
protected TSDataType[] dataTypes;
protected MeasurementSchema[] schemas;
+ // for updating last cache
+ private MNode deviceMNode;
+
// record the failed measurements
protected List<String> failedMeasurements;
@@ -81,6 +85,14 @@ abstract public class InsertPlan extends PhysicalPlan {
return failedMeasurements == null ? 0 : failedMeasurements.size();
}
+ public MNode getDeviceMNode() {
+ return deviceMNode;
+ }
+
+ public void setDeviceMNode(MNode deviceMNode) {
+ this.deviceMNode = deviceMNode;
+ }
+
/**
* @param index failed measurement index
*/