You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/27 14:55:49 UTC
[iotdb] branch master updated: Update last cache when insert (#6048)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3b900430e8 Update last cache when insert (#6048)
3b900430e8 is described below
commit 3b900430e8ab6c5543b51039d95f92b545d00499
Author: Haonan <hh...@outlook.com>
AuthorDate: Fri May 27 22:55:44 2022 +0800
Update last cache when insert (#6048)
---
.../iotdb/db/engine/storagegroup/DataRegion.java | 42 +++++++++++++++--
.../planner/plan/node/write/InsertRowNode.java | 10 ++++
.../planner/plan/node/write/InsertTabletNode.java | 55 ++++++++++++++++++++++
3 files changed, 104 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 7bb7abe855..2073fc8637 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.idtable.IDTableManager;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
@@ -1078,8 +1079,7 @@ public class DataRegion {
}
long globalLatestFlushedTime =
lastFlushTimeManager.getGlobalFlushedTime(insertTabletNode.getDevicePath().getFullPath());
- // TODO:LAST CACHE
- // tryToUpdateBatchInsertLastCache(insertTabletNode, globalLatestFlushedTime);
+ tryToUpdateBatchInsertLastCache(insertTabletNode, globalLatestFlushedTime);
if (!noFailure) {
throw new BatchProcessException(results);
@@ -1253,6 +1253,24 @@ public class DataRegion {
}
}
+ private void tryToUpdateBatchInsertLastCache(InsertTabletNode node, long latestFlushedTime) {
+ if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
+ return;
+ }
+ for (int i = 0; i < node.getColumns().length; i++) {
+ if (node.getColumns()[i] == null) {
+ continue;
+ }
+ // Update cached last value with high priority
+ DataNodeSchemaCache.getInstance()
+ .updateLastCache(
+ node.getDevicePath().concatNode(node.getMeasurements()[i]),
+ node.composeLastTimeValuePair(i),
+ true,
+ latestFlushedTime);
+ }
+ }
+
private void insertToTsFileProcessor(
InsertRowPlan insertRowPlan, boolean sequence, long timePartitionId)
throws WriteProcessException {
@@ -1295,7 +1313,7 @@ public class DataRegion {
long globalLatestFlushTime =
lastFlushTimeManager.getGlobalFlushedTime(insertRowNode.getDevicePath().getFullPath());
- // tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime);
+ tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime);
// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor.shouldFlush()) {
@@ -1328,6 +1346,24 @@ public class DataRegion {
}
}
+ private void tryToUpdateInsertLastCache(InsertRowNode node, long latestFlushedTime) {
+ if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
+ return;
+ }
+ for (int i = 0; i < node.getValues().length; i++) {
+ if (node.getValues()[i] == null) {
+ continue;
+ }
+ // Update cached last value with high priority
+ DataNodeSchemaCache.getInstance()
+ .updateLastCache(
+ node.getDevicePath().concatNode(node.getMeasurements()[i]),
+ node.composeTimeValuePair(i),
+ true,
+ latestFlushedTime);
+ }
+ }
+
/**
* WAL module uses this method to flush memTable
*
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index 014d08d211..8e40d44869 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -42,8 +42,10 @@ import org.apache.iotdb.db.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -614,4 +616,12 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitInsertRow(this, context);
}
+
+ public TimeValuePair composeTimeValuePair(int columnIndex) {
+ if (columnIndex >= values.length) {
+ return null;
+ }
+ Object value = values[columnIndex];
+ return new TimeValuePair(time, TsPrimitiveType.getByType(dataTypes[columnIndex], value));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index e076209edc..21422d124d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -39,10 +39,12 @@ import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.io.DataInputStream;
@@ -834,4 +836,57 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitInsertTablet(this, context);
}
+
+ public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+ if (measurementIndex >= columns.length) {
+ return null;
+ }
+
+ // get non-null value
+ int lastIdx = rowCount - 1;
+ if (bitMaps != null && bitMaps[measurementIndex] != null) {
+ BitMap bitMap = bitMaps[measurementIndex];
+ while (lastIdx >= 0) {
+ if (!bitMap.isMarked(lastIdx)) {
+ break;
+ }
+ lastIdx--;
+ }
+ }
+ if (lastIdx < 0) {
+ return null;
+ }
+
+ TsPrimitiveType value;
+ switch (dataTypes[measurementIndex]) {
+ case INT32:
+ int[] intValues = (int[]) columns[measurementIndex];
+ value = new TsPrimitiveType.TsInt(intValues[lastIdx]);
+ break;
+ case INT64:
+ long[] longValues = (long[]) columns[measurementIndex];
+ value = new TsPrimitiveType.TsLong(longValues[lastIdx]);
+ break;
+ case FLOAT:
+ float[] floatValues = (float[]) columns[measurementIndex];
+ value = new TsPrimitiveType.TsFloat(floatValues[lastIdx]);
+ break;
+ case DOUBLE:
+ double[] doubleValues = (double[]) columns[measurementIndex];
+ value = new TsPrimitiveType.TsDouble(doubleValues[lastIdx]);
+ break;
+ case BOOLEAN:
+ boolean[] boolValues = (boolean[]) columns[measurementIndex];
+ value = new TsPrimitiveType.TsBoolean(boolValues[lastIdx]);
+ break;
+ case TEXT:
+ Binary[] binaryValues = (Binary[]) columns[measurementIndex];
+ value = new TsPrimitiveType.TsBinary(binaryValues[lastIdx]);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
+ }
+ return new TimeValuePair(times[lastIdx], value);
+ }
}