You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/10 11:43:51 UTC
[incubator-iotdb] 01/01: fix deadlock of insert and show latest
timeseries
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_deadlock_0.10
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 398dc84dd0fa8af015eff34e38deda6e0bde1399
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jul 10 19:43:28 2020 +0800
fix deadlock of insert and show latest timeseries
---
.../engine/storagegroup/StorageGroupProcessor.java | 44 +++++++---------------
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 2 +
.../iotdb/db/qp/physical/crud/InsertPlan.java | 11 ++++++
.../db/qp/physical/crud/InsertTabletPlan.java | 11 ++++++
4 files changed, 38 insertions(+), 30 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b82bcce..eef78cf 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -761,21 +761,13 @@ public class StorageGroupProcessor {
private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime)
throws WriteProcessException {
- MNode node = null;
- try {
- MManager manager = MManager.getInstance();
- node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
- String[] measurementList = plan.getMeasurements();
- for (int i = 0; i < measurementList.length; i++) {
- // Update cached last value with high priority
- ((LeafMNode) manager.getChild(node, measurementList[i]))
- .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
- }
- } catch (MetadataException e) {
- throw new WriteProcessException(e);
- } finally {
+ MNode node = plan.getDeviceMNode();
+ String[] measurementList = plan.getMeasurements();
+ for (int i = 0; i < measurementList.length; i++) {
+ // Update cached last value with high priority
if (node != null) {
- ((InternalMNode) node).readUnlock();
+ ((LeafMNode) node.getChild(measurementList[i]))
+ .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
}
}
}
@@ -813,24 +805,16 @@ public class StorageGroupProcessor {
private void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime)
throws WriteProcessException {
- MNode node = null;
- try {
- MManager manager = MManager.getInstance();
- node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
- String[] measurementList = plan.getMeasurements();
- for (int i = 0; i < measurementList.length; i++) {
- if (plan.getValues()[i] == null) {
- continue;
- }
- // Update cached last value with high priority
- ((LeafMNode) manager.getChild(node, measurementList[i]))
- .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
+ MNode node = plan.getDeviceMNode();
+ String[] measurementList = plan.getMeasurements();
+ for (int i = 0; i < measurementList.length; i++) {
+ if (plan.getValues()[i] == null) {
+ continue;
}
- } catch (MetadataException e) {
- throw new WriteProcessException(e);
- } finally {
+ // Update cached last value with high priority
if (node != null) {
- ((InternalMNode) node).readUnlock();
+ ((LeafMNode) node.getChild(measurementList[i]))
+ .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 27c31e6..efa68f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -916,6 +916,7 @@ public class PlanExecutor implements IPlanExecutor {
insertPlan.setMeasurements(measurementList);
insertPlan.setSchemasAndTransferType(schemas);
+ insertPlan.setDeviceMNode(node);
StorageEngine.getInstance().insert(insertPlan);
if (insertPlan.getFailedMeasurements() != null) {
throw new StorageEngineException(
@@ -1055,6 +1056,7 @@ public class PlanExecutor implements IPlanExecutor {
measurementList[i] = measurementNode.getName();
}
insertTabletPlan.setSchemas(schemas);
+ insertTabletPlan.setDeviceMNode(node);
return StorageEngine.getInstance().insertTablet(insertTabletPlan);
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index f617a25..611aa31 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -29,6 +29,7 @@ import java.util.Objects;
import java.util.Set;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -57,6 +58,8 @@ public class InsertPlan extends PhysicalPlan {
private TSDataType[] types;
private MeasurementSchema[] schemas;
+ private MNode deviceMNode;
+
// if inferType is false, use the type of values directly
// if inferType is true, values is String[], and infer types from them
private boolean inferType = false;
@@ -362,6 +365,14 @@ public class InsertPlan extends PhysicalPlan {
return failedMeasurements == null ? 0 : failedMeasurements.size();
}
+ public MNode getDeviceMNode() {
+ return deviceMNode;
+ }
+
+ public void setDeviceMNode(MNode deviceMNode) {
+ this.deviceMNode = deviceMNode;
+ }
+
public TSDataType[] getTypes() {
return types;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 2928116..97e5781 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -56,6 +57,8 @@ public class InsertTabletPlan extends PhysicalPlan {
private long[] times; // times should be sorted. It is done in the session API.
private ByteBuffer timeBuffer;
+ private MNode deviceMNode;
+
private Object[] columns;
private ByteBuffer valueBuffer;
private Set<Integer> index;
@@ -356,6 +359,14 @@ public class InsertTabletPlan extends PhysicalPlan {
this.measurements = measurements;
}
+ public MNode getDeviceMNode() {
+ return deviceMNode;
+ }
+
+ public void setDeviceMNode(MNode deviceMNode) {
+ this.deviceMNode = deviceMNode;
+ }
+
public TSDataType[] getDataTypes() {
return dataTypes;
}