You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/07/29 09:45:09 UTC
[iotdb] branch check_lost_data_ty updated: add checkers for
InsertRowPlan and InsertRowsOfOneDevicePlan
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch check_lost_data_ty
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/check_lost_data_ty by this push:
new a0c34a4 add checkers for InsertRowPlan and InsertRowsOfOneDevicePlan
a0c34a4 is described below
commit a0c34a4dbcb42746c4bf9118a66a0bb45ed3dd41
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Jul 29 17:44:15 2021 +0800
add checkers for InsertRowPlan and InsertRowsOfOneDevicePlan
---
.../engine/storagegroup/StorageGroupProcessor.java | 2 ++
.../db/engine/storagegroup/TsFileProcessor.java | 2 ++
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 1 +
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 15 ++++++++++
.../physical/crud/InsertRowsOfOneDevicePlan.java | 35 +++++++++++++++++-----
5 files changed, 48 insertions(+), 7 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 8beca06..cecb564 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1050,6 +1050,7 @@ public class StorageGroupProcessor {
return;
}
+ insertRowPlan.checkForTianYuan("StorageGroupProcessor#insertToTsFileProcessor");
tsFileProcessor.insert(insertRowPlan);
// try to update the latest time of the device of this tsRecord
@@ -2910,6 +2911,7 @@ public class StorageGroupProcessor {
writeLock("InsertRowsOfOneDevice");
try {
boolean isSequence = false;
+ insertRowsOfOneDevicePlan.checkForTianYuan("StorageGroupProcessor#insert");
InsertRowPlan[] rowPlans = insertRowsOfOneDevicePlan.getRowPlans();
for (int i = 0, rowPlansLength = rowPlans.length; i < rowPlansLength; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 9642271..fd2aaa8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -188,6 +188,8 @@ public class TsFileProcessor {
checkMemCostAndAddToTspInfo(insertRowPlan);
}
+ insertRowPlan.checkForTianYuan("TsFileProcessor#InsertRowPlan");
+
workMemTable.insert(insertRowPlan);
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index b07750f..ce2ddf1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1198,6 +1198,7 @@ public class PlanExecutor implements IPlanExecutor {
// we do not need to infer data type for insertRowsOfOneDevicePlan
}
// ok, we can begin to write data into the engine..
+ insertRowsOfOneDevicePlan.checkForTianYuan("PlanExecutor#insert");
StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
List<String> notExistedPaths = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 8399527..f7caf0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -63,6 +63,21 @@ public class InsertRowPlan extends InsertPlan {
private List<Object> failedValues;
+ public void checkForTianYuan(String location) {
+ for (int j = 0; j < getMeasurements().length; j++) {
+ if (getMeasurements()[j].equals("TY_0001_Raw_Packet")) {
+ String value = ((Binary) getValues()[j]).getStringValue().substring(0, 35);
+ if (!value.contains(getDeviceId().getMeasurement().substring(4))) {
+ logger.error(
+ "{}: receive error data,device:{}, value(first 100 bytes): {}",
+ location,
+ getDeviceId(),
+ value);
+ }
+ }
+ }
+ }
+
public InsertRowPlan() {
super(OperatorType.INSERT);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 46da4e0..a861adf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -23,6 +23,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.BatchPlan;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -31,15 +35,29 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
private static Logger logger = LoggerFactory.getLogger(InsertRowsOfOneDevicePlan.class);
boolean[] isExecuted;
private InsertRowPlan[] rowPlans;
+ public void checkForTianYuan(String location) {
+ for (int i = 0; i < rowPlans.length; ++i) {
+ for (int j = 0; j < rowPlans[i].getMeasurements().length; j++) {
+ if (rowPlans[i].getMeasurements()[j].equals("TY_0001_Raw_Packet")) {
+ String value = ((Binary) rowPlans[i].getValues()[j]).getStringValue().substring(0, 35);
+ if (!value.contains(rowPlans[i].getDeviceId().getMeasurement().substring(4))) {
+ logger.error(
+ "{}: receive error data,device:{}, value(first 100 bytes): {}",
+ location,
+ rowPlans[i].getDeviceId(),
+ value);
+ }
+ }
+ }
+ }
+ }
+
public InsertRowsOfOneDevicePlan(
PartialPath deviceId,
Long[] insertTimes,
@@ -67,12 +85,15 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
+ ", time:"
+ insertTimes[i]);
}
- //Just for Tianyuan debug
- for(int j = 0; j < rowPlans[i].getMeasurements().length; j ++) {
+ // Just for Tianyuan debug
+ for (int j = 0; j < rowPlans[i].getMeasurements().length; j++) {
if (rowPlans[i].getMeasurements()[j].equals("TY_0001_Raw_Packet")) {
- String value = ((Binary) rowPlans[i].getValues()[j]).getStringValue().substring(0,100);
+ String value = ((Binary) rowPlans[i].getValues()[j]).getStringValue().substring(0, 100);
if (!value.contains(rowPlans[i].getDeviceId().getMeasurement().substring(4))) {
- logger.error("receive error data,device:{}, value(first 100 bytes): {}", rowPlans[i].getDeviceId(), value);
+ logger.error(
+ "receive error data,device:{}, value(first 100 bytes): {}",
+ rowPlans[i].getDeviceId(),
+ value);
}
}
}