You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/26 05:16:52 UTC

[incubator-iotdb] 01/01: try to fix last cache

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_tablet_last
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit effcddce34e34149e745538828902eed33cb4a88
Author: qiaojialin <64...@qq.com>
AuthorDate: Sun Apr 26 13:16:33 2020 +0800

    try to fix last cache
---
 .../db/engine/storagegroup/StorageGroupProcessor.java    |  7 ++++---
 .../iotdb/db/qp/physical/crud/InsertTabletPlan.java      | 16 ++++++++--------
 2 files changed, 12 insertions(+), 11 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 803e147..74bfa63 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -575,6 +575,7 @@ public class StorageGroupProcessor {
       return;
     }
 
+    logger.info("@+++<<<: current batch start {} end {}", start, end);
     TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
     if (tsFileProcessor == null) {
       for (int i = start; i < end; i++) {
@@ -601,7 +602,7 @@ public class StorageGroupProcessor {
     }
     long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
         insertTabletPlan.getDeviceId(), Long.MIN_VALUE);
-    tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);
+    tryToUpdateBatchInsertLastCache(insertTabletPlan, end -1, globalLatestFlushedTime);
 
     // check memtable size and may async try to flush the work memtable
     if (tsFileProcessor.shouldFlush()) {
@@ -609,7 +610,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  public void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime)
+  public void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, int lastIndex, Long latestFlushedTime)
       throws WriteProcessException {
     MNode node = null;
     try {
@@ -619,7 +620,7 @@ public class StorageGroupProcessor {
         // Update cached last value with high priority
         MNode measurementNode = node.getChild(measurementList[i]);
         ((LeafMNode) measurementNode)
-            .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
+            .updateCachedLast(plan.composeLastTimeValuePair(i, lastIndex), true, latestFlushedTime);
       }
     } catch (MetadataException e) {
       throw new WriteProcessException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index a26ac3b..65abe11 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -417,7 +417,7 @@ public class InsertTabletPlan extends PhysicalPlan {
     return tmpMaxTime;
   }
 
-  public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+  public TimeValuePair composeLastTimeValuePair(int measurementIndex, int lastIndex) {
     if (measurementIndex >= columns.length) {
       return null;
     }
@@ -425,33 +425,33 @@ public class InsertTabletPlan extends PhysicalPlan {
     switch (dataTypes[measurementIndex]) {
       case INT32:
         int[] intValues = (int[]) columns[measurementIndex];
-        value = new TsInt(intValues[end - 1]);
+        value = new TsInt(intValues[lastIndex]);
         break;
       case INT64:
         long[] longValues = (long[]) columns[measurementIndex];
-        value = new TsLong(longValues[end - 1]);
+        value = new TsLong(longValues[lastIndex]);
         break;
       case FLOAT:
         float[] floatValues = (float[]) columns[measurementIndex];
-        value = new TsFloat(floatValues[end - 1]);
+        value = new TsFloat(floatValues[lastIndex]);
         break;
       case DOUBLE:
         double[] doubleValues = (double[]) columns[measurementIndex];
-        value = new TsDouble(doubleValues[end - 1]);
+        value = new TsDouble(doubleValues[lastIndex]);
         break;
       case BOOLEAN:
         boolean[] boolValues = (boolean[]) columns[measurementIndex];
-        value = new TsBoolean(boolValues[end - 1]);
+        value = new TsBoolean(boolValues[lastIndex]);
         break;
       case TEXT:
         Binary[] binaryValues = (Binary[]) columns[measurementIndex];
-        value = new TsBinary(binaryValues[end - 1]);
+        value = new TsBinary(binaryValues[lastIndex]);
         break;
       default:
         throw new UnSupportedDataTypeException(
             String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
     }
-    return new TimeValuePair(times[end - 1], value);
+    return new TimeValuePair(times[lastIndex], value);
   }
 
   public long[] getTimes() {