You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/27 14:55:49 UTC

[iotdb] branch master updated: Update last cache when insert (#6048)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b900430e8 Update last cache when insert (#6048)
3b900430e8 is described below

commit 3b900430e8ab6c5543b51039d95f92b545d00499
Author: Haonan <hh...@outlook.com>
AuthorDate: Fri May 27 22:55:44 2022 +0800

    Update last cache when insert (#6048)
---
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 42 +++++++++++++++--
 .../planner/plan/node/write/InsertRowNode.java     | 10 ++++
 .../planner/plan/node/write/InsertTabletNode.java  | 55 ++++++++++++++++++++++
 3 files changed, 104 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 7bb7abe855..2073fc8637 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.IDTableManager;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
@@ -1078,8 +1079,7 @@ public class DataRegion {
       }
       long globalLatestFlushedTime =
           lastFlushTimeManager.getGlobalFlushedTime(insertTabletNode.getDevicePath().getFullPath());
-      // TODO:LAST CACHE
-      //      tryToUpdateBatchInsertLastCache(insertTabletNode, globalLatestFlushedTime);
+      tryToUpdateBatchInsertLastCache(insertTabletNode, globalLatestFlushedTime);
 
       if (!noFailure) {
         throw new BatchProcessException(results);
@@ -1253,6 +1253,24 @@ public class DataRegion {
     }
   }
 
+  private void tryToUpdateBatchInsertLastCache(InsertTabletNode node, long latestFlushedTime) {
+    if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
+      return;
+    }
+    for (int i = 0; i < node.getColumns().length; i++) {
+      if (node.getColumns()[i] == null) {
+        continue;
+      }
+      // Update cached last value with high priority
+      DataNodeSchemaCache.getInstance()
+          .updateLastCache(
+              node.getDevicePath().concatNode(node.getMeasurements()[i]),
+              node.composeLastTimeValuePair(i),
+              true,
+              latestFlushedTime);
+    }
+  }
+
   private void insertToTsFileProcessor(
       InsertRowPlan insertRowPlan, boolean sequence, long timePartitionId)
       throws WriteProcessException {
@@ -1295,7 +1313,7 @@ public class DataRegion {
     long globalLatestFlushTime =
         lastFlushTimeManager.getGlobalFlushedTime(insertRowNode.getDevicePath().getFullPath());
 
-    // tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime);
+    tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime);
 
     // check memtable size and may asyncTryToFlush the work memtable
     if (tsFileProcessor.shouldFlush()) {
@@ -1328,6 +1346,24 @@ public class DataRegion {
     }
   }
 
+  private void tryToUpdateInsertLastCache(InsertRowNode node, long latestFlushedTime) {
+    if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
+      return;
+    }
+    for (int i = 0; i < node.getValues().length; i++) {
+      if (node.getValues()[i] == null) {
+        continue;
+      }
+      // Update cached last value with high priority
+      DataNodeSchemaCache.getInstance()
+          .updateLastCache(
+              node.getDevicePath().concatNode(node.getMeasurements()[i]),
+              node.composeTimeValuePair(i),
+              true,
+              latestFlushedTime);
+    }
+  }
+
   /**
    * WAL module uses this method to flush memTable
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index 014d08d211..8e40d44869 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -42,8 +42,10 @@ import org.apache.iotdb.db.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.slf4j.Logger;
@@ -614,4 +616,12 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitInsertRow(this, context);
   }
+
+  public TimeValuePair composeTimeValuePair(int columnIndex) {
+    if (columnIndex >= values.length) {
+      return null;
+    }
+    Object value = values[columnIndex];
+    return new TimeValuePair(time, TsPrimitiveType.getByType(dataTypes[columnIndex], value));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index e076209edc..21422d124d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -39,10 +39,12 @@ import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.io.DataInputStream;
@@ -834,4 +836,57 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitInsertTablet(this, context);
   }
+
+  public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+    if (measurementIndex >= columns.length) {
+      return null;
+    }
+
+    // get non-null value
+    int lastIdx = rowCount - 1;
+    if (bitMaps != null && bitMaps[measurementIndex] != null) {
+      BitMap bitMap = bitMaps[measurementIndex];
+      while (lastIdx >= 0) {
+        if (!bitMap.isMarked(lastIdx)) {
+          break;
+        }
+        lastIdx--;
+      }
+    }
+    if (lastIdx < 0) {
+      return null;
+    }
+
+    TsPrimitiveType value;
+    switch (dataTypes[measurementIndex]) {
+      case INT32:
+        int[] intValues = (int[]) columns[measurementIndex];
+        value = new TsPrimitiveType.TsInt(intValues[lastIdx]);
+        break;
+      case INT64:
+        long[] longValues = (long[]) columns[measurementIndex];
+        value = new TsPrimitiveType.TsLong(longValues[lastIdx]);
+        break;
+      case FLOAT:
+        float[] floatValues = (float[]) columns[measurementIndex];
+        value = new TsPrimitiveType.TsFloat(floatValues[lastIdx]);
+        break;
+      case DOUBLE:
+        double[] doubleValues = (double[]) columns[measurementIndex];
+        value = new TsPrimitiveType.TsDouble(doubleValues[lastIdx]);
+        break;
+      case BOOLEAN:
+        boolean[] boolValues = (boolean[]) columns[measurementIndex];
+        value = new TsPrimitiveType.TsBoolean(boolValues[lastIdx]);
+        break;
+      case TEXT:
+        Binary[] binaryValues = (Binary[]) columns[measurementIndex];
+        value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
+    }
+    return new TimeValuePair(times[lastIdx], value);
+  }
 }