You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/26 05:16:51 UTC

[incubator-iotdb] branch fix_tablet_last created (now effcddc)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch fix_tablet_last
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at effcddc  try to fix last cache

This branch includes the following new commits:

     new effcddc  try to fix last cache

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: try to fix last cache

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_tablet_last
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit effcddce34e34149e745538828902eed33cb4a88
Author: qiaojialin <64...@qq.com>
AuthorDate: Sun Apr 26 13:16:33 2020 +0800

    try to fix last cache
---
 .../db/engine/storagegroup/StorageGroupProcessor.java    |  7 ++++---
 .../iotdb/db/qp/physical/crud/InsertTabletPlan.java      | 16 ++++++++--------
 2 files changed, 12 insertions(+), 11 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 803e147..74bfa63 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -575,6 +575,7 @@ public class StorageGroupProcessor {
       return;
     }
 
+    logger.info("@+++<<<: current batch start {} end {}", start, end);
     TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
     if (tsFileProcessor == null) {
       for (int i = start; i < end; i++) {
@@ -601,7 +602,7 @@ public class StorageGroupProcessor {
     }
     long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
         insertTabletPlan.getDeviceId(), Long.MIN_VALUE);
-    tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);
+    tryToUpdateBatchInsertLastCache(insertTabletPlan, end -1, globalLatestFlushedTime);
 
     // check memtable size and may async try to flush the work memtable
     if (tsFileProcessor.shouldFlush()) {
@@ -609,7 +610,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  public void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime)
+  public void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, int lastIndex, Long latestFlushedTime)
       throws WriteProcessException {
     MNode node = null;
     try {
@@ -619,7 +620,7 @@ public class StorageGroupProcessor {
         // Update cached last value with high priority
         MNode measurementNode = node.getChild(measurementList[i]);
         ((LeafMNode) measurementNode)
-            .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
+            .updateCachedLast(plan.composeLastTimeValuePair(i, lastIndex), true, latestFlushedTime);
       }
     } catch (MetadataException e) {
       throw new WriteProcessException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index a26ac3b..65abe11 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -417,7 +417,7 @@ public class InsertTabletPlan extends PhysicalPlan {
     return tmpMaxTime;
   }
 
-  public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+  public TimeValuePair composeLastTimeValuePair(int measurementIndex, int lastIndex) {
     if (measurementIndex >= columns.length) {
       return null;
     }
@@ -425,33 +425,33 @@ public class InsertTabletPlan extends PhysicalPlan {
     switch (dataTypes[measurementIndex]) {
       case INT32:
         int[] intValues = (int[]) columns[measurementIndex];
-        value = new TsInt(intValues[end - 1]);
+        value = new TsInt(intValues[lastIndex]);
         break;
       case INT64:
         long[] longValues = (long[]) columns[measurementIndex];
-        value = new TsLong(longValues[end - 1]);
+        value = new TsLong(longValues[lastIndex]);
         break;
       case FLOAT:
         float[] floatValues = (float[]) columns[measurementIndex];
-        value = new TsFloat(floatValues[end - 1]);
+        value = new TsFloat(floatValues[lastIndex]);
         break;
       case DOUBLE:
         double[] doubleValues = (double[]) columns[measurementIndex];
-        value = new TsDouble(doubleValues[end - 1]);
+        value = new TsDouble(doubleValues[lastIndex]);
         break;
       case BOOLEAN:
         boolean[] boolValues = (boolean[]) columns[measurementIndex];
-        value = new TsBoolean(boolValues[end - 1]);
+        value = new TsBoolean(boolValues[lastIndex]);
         break;
       case TEXT:
         Binary[] binaryValues = (Binary[]) columns[measurementIndex];
-        value = new TsBinary(binaryValues[end - 1]);
+        value = new TsBinary(binaryValues[lastIndex]);
         break;
       default:
         throw new UnSupportedDataTypeException(
             String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
     }
-    return new TimeValuePair(times[end - 1], value);
+    return new TimeValuePair(times[lastIndex], value);
   }
 
   public long[] getTimes() {