You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/07/29 09:45:09 UTC

[iotdb] branch check_lost_data_ty updated: add checkers for InsertRowPlan and InsertRowsOfOneDevicePlan

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch check_lost_data_ty
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/check_lost_data_ty by this push:
     new a0c34a4  add checkers for InsertRowPlan and InsertRowsOfOneDevicePlan
a0c34a4 is described below

commit a0c34a4dbcb42746c4bf9118a66a0bb45ed3dd41
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Jul 29 17:44:15 2021 +0800

    add checkers for InsertRowPlan and InsertRowsOfOneDevicePlan
---
 .../engine/storagegroup/StorageGroupProcessor.java |  2 ++
 .../db/engine/storagegroup/TsFileProcessor.java    |  2 ++
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  1 +
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   | 15 ++++++++++
 .../physical/crud/InsertRowsOfOneDevicePlan.java   | 35 +++++++++++++++++-----
 5 files changed, 48 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 8beca06..cecb564 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1050,6 +1050,7 @@ public class StorageGroupProcessor {
       return;
     }
 
+    insertRowPlan.checkForTianYuan("StorageGroupProcessor#insertToTsFileProcessor");
     tsFileProcessor.insert(insertRowPlan);
 
     // try to update the latest time of the device of this tsRecord
@@ -2910,6 +2911,7 @@ public class StorageGroupProcessor {
     writeLock("InsertRowsOfOneDevice");
     try {
       boolean isSequence = false;
+      insertRowsOfOneDevicePlan.checkForTianYuan("StorageGroupProcessor#insert");
       InsertRowPlan[] rowPlans = insertRowsOfOneDevicePlan.getRowPlans();
       for (int i = 0, rowPlansLength = rowPlans.length; i < rowPlansLength; i++) {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 9642271..fd2aaa8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -188,6 +188,8 @@ public class TsFileProcessor {
       checkMemCostAndAddToTspInfo(insertRowPlan);
     }
 
+    insertRowPlan.checkForTianYuan("TsFileProcessor#InsertRowPlan");
+
     workMemTable.insert(insertRowPlan);
 
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index b07750f..ce2ddf1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1198,6 +1198,7 @@ public class PlanExecutor implements IPlanExecutor {
         // we do not need to infer data type for insertRowsOfOneDevicePlan
       }
       // ok, we can begin to write data into the engine..
+      insertRowsOfOneDevicePlan.checkForTianYuan("PlanExecutor#insert");
       StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
 
       List<String> notExistedPaths = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 8399527..f7caf0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -63,6 +63,21 @@ public class InsertRowPlan extends InsertPlan {
 
   private List<Object> failedValues;
 
+  public void checkForTianYuan(String location) {
+    for (int j = 0; j < getMeasurements().length; j++) {
+      if (getMeasurements()[j].equals("TY_0001_Raw_Packet")) {
+        String value = ((Binary) getValues()[j]).getStringValue().substring(0, 35);
+        if (!value.contains(getDeviceId().getMeasurement().substring(4))) {
+          logger.error(
+              "{}: receive error data,device:{}, value(first 100 bytes): {}",
+              location,
+              getDeviceId(),
+              value);
+        }
+      }
+    }
+  }
+
   public InsertRowPlan() {
     super(OperatorType.INSERT);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 46da4e0..a861adf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -23,6 +23,10 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.BatchPlan;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -31,15 +35,29 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
   private static Logger logger = LoggerFactory.getLogger(InsertRowsOfOneDevicePlan.class);
   boolean[] isExecuted;
   private InsertRowPlan[] rowPlans;
 
+  public void checkForTianYuan(String location) {
+    for (int i = 0; i < rowPlans.length; ++i) {
+      for (int j = 0; j < rowPlans[i].getMeasurements().length; j++) {
+        if (rowPlans[i].getMeasurements()[j].equals("TY_0001_Raw_Packet")) {
+          String value = ((Binary) rowPlans[i].getValues()[j]).getStringValue().substring(0, 35);
+          if (!value.contains(rowPlans[i].getDeviceId().getMeasurement().substring(4))) {
+            logger.error(
+                "{}: receive error data,device:{}, value(first 100 bytes): {}",
+                location,
+                rowPlans[i].getDeviceId(),
+                value);
+          }
+        }
+      }
+    }
+  }
+
   public InsertRowsOfOneDevicePlan(
       PartialPath deviceId,
       Long[] insertTimes,
@@ -67,12 +85,15 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
                 + ", time:"
                 + insertTimes[i]);
       }
-      //Just for Tianyuan debug
-      for(int j = 0; j < rowPlans[i].getMeasurements().length; j ++) {
+      // Just for Tianyuan debug
+      for (int j = 0; j < rowPlans[i].getMeasurements().length; j++) {
         if (rowPlans[i].getMeasurements()[j].equals("TY_0001_Raw_Packet")) {
-          String value = ((Binary) rowPlans[i].getValues()[j]).getStringValue().substring(0,100);
+          String value = ((Binary) rowPlans[i].getValues()[j]).getStringValue().substring(0, 100);
           if (!value.contains(rowPlans[i].getDeviceId().getMeasurement().substring(4))) {
-            logger.error("receive error data,device:{}, value(first 100 bytes): {}", rowPlans[i].getDeviceId(), value);
+            logger.error(
+                "receive error data,device:{}, value(first 100 bytes): {}",
+                rowPlans[i].getDeviceId(),
+                value);
           }
         }
       }