You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/09/12 04:32:34 UTC
[iotdb] branch master updated: [IOTDB-1543] LastCache for Template
and Vector (#3796)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3cb08e0 [IOTDB-1543] LastCache for Template and Vector (#3796)
3cb08e0 is described below
commit 3cb08e0243269e4d5980942397ad2cccede081ce
Author: zyk990424 <38...@users.noreply.github.com>
AuthorDate: Sun Sep 12 12:32:10 2021 +0800
[IOTDB-1543] LastCache for Template and Vector (#3796)
---
.../apache/iotdb/cluster/metadata/CMManager.java | 11 +-
.../engine/storagegroup/StorageGroupProcessor.java | 116 +++-----
.../org/apache/iotdb/db/metadata/MManager.java | 195 +++++++++++-
.../java/org/apache/iotdb/db/metadata/MTree.java | 46 +--
.../db/metadata/lastCache/LastCacheManager.java | 331 +++++++++++++++++++++
.../container/ILastCacheContainer.java} | 56 ++--
.../lastCache/container/LastCacheContainer.java | 118 ++++++++
.../lastCache/container/value/ILastCacheValue.java | 47 +++
.../container/value/UnaryLastCacheValue.java | 106 +++++++
.../container/value/VectorLastCacheValue.java | 86 ++++++
.../iotdb/db/metadata/mnode/EntityMNode.java | 25 ++
.../iotdb/db/metadata/mnode/IEntityMNode.java | 6 +
.../iotdb/db/metadata/mnode/IMeasurementMNode.java | 9 +-
.../iotdb/db/metadata/mnode/MeasurementMNode.java | 45 +--
.../apache/iotdb/db/metadata/tag/TagManager.java | 5 +-
.../iotdb/db/query/executor/LastQueryExecutor.java | 40 ++-
.../iotdb/db/engine/storagegroup/TTLTest.java | 4 +-
.../apache/iotdb/db/integration/IoTDBLastIT.java | 28 +-
.../iotdb/db/metadata/MManagerAdvancedTest.java | 16 +-
19 files changed, 1063 insertions(+), 227 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index fa06e12..75ae379 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.VectorPartialPath;
+import org.apache.iotdb.db.metadata.lastCache.LastCacheManager;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.InternalMNode;
@@ -407,26 +408,26 @@ public class CMManager extends MManager {
PartialPath seriesPath,
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
- Long latestFlushedTime,
- IMeasurementMNode node) {
+ Long latestFlushedTime) {
cacheLock.writeLock().lock();
try {
IMeasurementMNode measurementMNode = mRemoteMetaCache.get(seriesPath);
if (measurementMNode != null) {
- measurementMNode.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
+ LastCacheManager.updateLastCache(
+ seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime, measurementMNode);
}
} finally {
cacheLock.writeLock().unlock();
}
// maybe local also has the timeseries
- super.updateLastCache(seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime, node);
+ super.updateLastCache(seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime);
}
@Override
public TimeValuePair getLastCache(PartialPath seriesPath) {
IMeasurementMNode measurementMNode = mRemoteMetaCache.get(seriesPath);
if (measurementMNode != null) {
- return measurementMNode.getCachedLast();
+ return LastCacheManager.getLastCache(seriesPath, measurementMNode);
}
return super.getLastCache(seriesPath);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a5b3933..9ded4e6 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -55,7 +55,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -75,7 +74,6 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -1090,32 +1088,34 @@ public class StorageGroupProcessor {
return;
}
IMeasurementMNode[] mNodes = plan.getMeasurementMNodes();
- int columnIndex = 0;
for (int i = 0; i < mNodes.length; i++) {
- // Don't update cached last value for vector type
- if (mNodes[i] != null && plan.isAligned()) {
- columnIndex += mNodes[i].getSchema().getValueMeasurementIdList().size();
+ if (plan.getColumns()[i] == null) {
+ continue;
+ }
+ // Update cached last value with high priority
+ if (mNodes[i] == null) {
+ // no matter aligned or not, concat the path to use the full path to update LastCache
+ IoTDB.metaManager.updateLastCache(
+ plan.getPrefixPath().concatNode(plan.getMeasurements()[i]),
+ plan.composeLastTimeValuePair(i),
+ true,
+ latestFlushedTime);
} else {
- if (plan.getColumns()[i] == null) {
- columnIndex++;
- continue;
- }
- // Update cached last value with high priority
- if (mNodes[i] != null) {
- // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to
- // update last cache
+ if (plan.isAligned()) {
+ // vector lastCache update need subMeasurement
IoTDB.metaManager.updateLastCache(
- null, plan.composeLastTimeValuePair(columnIndex), true, latestFlushedTime, mNodes[i]);
+ mNodes[i],
+ plan.getMeasurements()[i],
+ plan.composeLastTimeValuePair(i),
+ true,
+ latestFlushedTime);
+
} else {
- // measurementMNodes[i] is null, use the path to update remote cache
+ // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to
+ // update last cache
IoTDB.metaManager.updateLastCache(
- plan.getPrefixPath().concatNode(plan.getMeasurements()[columnIndex]),
- plan.composeLastTimeValuePair(columnIndex),
- true,
- latestFlushedTime,
- null);
+ mNodes[i], plan.composeLastTimeValuePair(i), true, latestFlushedTime);
}
- columnIndex++;
}
}
}
@@ -1157,29 +1157,33 @@ public class StorageGroupProcessor {
return;
}
IMeasurementMNode[] mNodes = plan.getMeasurementMNodes();
- int columnIndex = 0;
- for (IMeasurementMNode mNode : mNodes) {
- // Don't update cached last value for vector type
- if (!plan.isAligned()) {
- if (plan.getValues()[columnIndex] == null) {
- columnIndex++;
- continue;
- }
- // Update cached last value with high priority
- if (mNode != null) {
- // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to
- // update last cache
+ for (int i = 0; i < mNodes.length; i++) {
+ if (plan.getValues()[i] == null) {
+ continue;
+ }
+ // Update cached last value with high priority
+ if (mNodes[i] == null) {
+ // no matter aligned or not, concat the path to use the full path to update LastCache
+ IoTDB.metaManager.updateLastCache(
+ plan.getPrefixPath().concatNode(plan.getMeasurements()[i]),
+ plan.composeTimeValuePair(i),
+ true,
+ latestFlushedTime);
+ } else {
+ if (plan.isAligned()) {
+ // vector lastCache update need subSensor path
IoTDB.metaManager.updateLastCache(
- null, plan.composeTimeValuePair(columnIndex), true, latestFlushedTime, mNode);
+ mNodes[i],
+ plan.getMeasurements()[i],
+ plan.composeTimeValuePair(i),
+ true,
+ latestFlushedTime);
} else {
+ // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to
+ // update last cache
IoTDB.metaManager.updateLastCache(
- plan.getPrefixPath().concatNode(plan.getMeasurements()[columnIndex]),
- plan.composeTimeValuePair(columnIndex),
- true,
- latestFlushedTime,
- null);
+ mNodes[i], plan.composeTimeValuePair(i), true, latestFlushedTime);
}
- columnIndex++;
}
}
}
@@ -2045,24 +2049,7 @@ public class StorageGroupProcessor {
return;
}
try {
- IMNode node = IoTDB.metaManager.getDeviceNode(deviceId);
-
- for (IMNode measurementNode : node.getChildren().values()) {
- if (measurementNode != null
- && originalPath.matchFullPath(measurementNode.getPartialPath())) {
- TimeValuePair lastPair = ((IMeasurementMNode) measurementNode).getCachedLast();
- if (lastPair != null
- && startTime <= lastPair.getTimestamp()
- && lastPair.getTimestamp() <= endTime) {
- ((IMeasurementMNode) measurementNode).resetCache();
- if (logger.isDebugEnabled()) {
- logger.debug(
- "[tryToDeleteLastCache] Last cache for path: {} is set to null",
- measurementNode.getFullPath());
- }
- }
- }
- }
+ IoTDB.metaManager.deleteLastCacheByDevice(deviceId, originalPath, startTime, endTime);
} catch (MetadataException e) {
throw new WriteProcessException(e);
}
@@ -2395,16 +2382,7 @@ public class StorageGroupProcessor {
return;
}
try {
- IMNode node = IoTDB.metaManager.getDeviceNode(deviceId);
-
- for (IMNode measurementNode : node.getChildren().values()) {
- if (measurementNode != null) {
- ((IMeasurementMNode) measurementNode).resetCache();
- logger.debug(
- "[tryToDeleteLastCacheByDevice] Last cache for path: {} is set to null",
- measurementNode.getFullPath());
- }
- }
+ IoTDB.metaManager.deleteLastCacheByDevice(deviceId);
} catch (MetadataException e) {
// the path doesn't cache in cluster mode now, ignore
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 9368523..5ea0477 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine;
import org.apache.iotdb.db.exception.metadata.*;
+import org.apache.iotdb.db.metadata.lastCache.LastCacheManager;
import org.apache.iotdb.db.metadata.logfile.MLogReader;
import org.apache.iotdb.db.metadata.logfile.MLogWriter;
import org.apache.iotdb.db.metadata.mnode.*;
@@ -1661,32 +1662,198 @@ public class MManager {
// do nothing
}
+ /**
+ * Update the last cache value of time series of given seriesPath.
+ *
+ * <p>MManager will use the seriesPath to search the node first and then process the lastCache in
+ * the MeasurementMNode
+ *
+ * <p>Invoking scenario: (1) after executing insertPlan (2) after reading last value from file
+ * during last Query
+ *
+ * @param seriesPath the path of timeseries or subMeasurement of aligned timeseries
+ * @param timeValuePair the latest point value
+ * @param highPriorityUpdate the last value from insertPlan is high priority
+ * @param latestFlushedTime latest flushed time
+ */
public void updateLastCache(
PartialPath seriesPath,
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
- Long latestFlushedTime,
- IMeasurementMNode node) {
- if (node != null) {
- node.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
- } else {
- try {
- IMeasurementMNode node1 = (IMeasurementMNode) mtree.getNodeByPath(seriesPath);
- node1.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
- } catch (MetadataException e) {
- logger.warn("failed to update last cache for the {}, err:{}", seriesPath, e.getMessage());
- }
+ Long latestFlushedTime) {
+ IMeasurementMNode node;
+ try {
+ node = (IMeasurementMNode) mtree.getNodeByPath(seriesPath);
+ } catch (MetadataException e) {
+ logger.warn("failed to update last cache for the {}, err:{}", seriesPath, e.getMessage());
+ return;
}
+
+ LastCacheManager.updateLastCache(
+ seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime, node);
}
+ /**
+ * Update the last cache value in given unary MeasurementMNode. Vector lastCache operation won't
+ * work.
+ *
+ * <p>Invoking scenario: (1) after executing insertPlan (2) after reading last value from file
+ * during last Query
+ *
+ * @param node the measurementMNode holding the lastCache, must be unary measurement
+ * @param timeValuePair the latest point value
+ * @param highPriorityUpdate the last value from insertPlan is high priority
+ * @param latestFlushedTime latest flushed time
+ */
+ public void updateLastCache(
+ IMeasurementMNode node,
+ TimeValuePair timeValuePair,
+ boolean highPriorityUpdate,
+ Long latestFlushedTime) {
+ if (node.getSchema() instanceof VectorMeasurementSchema) {
+ throw new UnsupportedOperationException("Must provide subMeasurement for vector measurement");
+ }
+ LastCacheManager.updateLastCache(
+ node.getPartialPath(), timeValuePair, highPriorityUpdate, latestFlushedTime, node);
+ }
+
+ /**
+ * Update the last cache value of subMeasurement given Vector MeasurementMNode.
+ *
+ * <p>Invoking scenario: (1) after executing insertPlan (2) after reading last value from file
+ * during last Query
+ *
+ * @param node the measurementMNode holding the lastCache
+ * @param subMeasurement the subMeasurement of aligned timeseries
+ * @param timeValuePair the latest point value
+ * @param highPriorityUpdate the last value from insertPlan is high priority
+ * @param latestFlushedTime latest flushed time
+ */
+ public void updateLastCache(
+ IMeasurementMNode node,
+ String subMeasurement,
+ TimeValuePair timeValuePair,
+ boolean highPriorityUpdate,
+ Long latestFlushedTime) {
+ if (!(node.getSchema() instanceof VectorMeasurementSchema)) {
+ throw new UnsupportedOperationException(
+ "Can't update lastCache of subMeasurement in unary measurement");
+ }
+ LastCacheManager.updateLastCache(
+ node.getPartialPath().concatNode(subMeasurement),
+ timeValuePair,
+ highPriorityUpdate,
+ latestFlushedTime,
+ node);
+ }
+
+ /**
+ * Get the last cache value of time series of given seriesPath. MManager will use the seriesPath
+ * to search the node.
+ *
+ * <p>Invoking scenario: last cache read during last Query
+ *
+ * @param seriesPath the full path from root to measurement of timeseries or subMeasurement of
+ * aligned timeseries
+ * @return the last cache value
+ */
public TimeValuePair getLastCache(PartialPath seriesPath) {
+ IMeasurementMNode node;
try {
- IMeasurementMNode node = (IMeasurementMNode) mtree.getNodeByPath(seriesPath);
- return node.getCachedLast();
+ node = (IMeasurementMNode) mtree.getNodeByPath(seriesPath);
} catch (MetadataException e) {
logger.warn("failed to get last cache for the {}, err:{}", seriesPath, e.getMessage());
+ return null;
+ }
+
+ return LastCacheManager.getLastCache(seriesPath, node);
+ }
+
+ /**
+ * Get the last cache value in given unary MeasurementMNode. Vector case won't work.
+ *
+ * <p>Invoking scenario: last cache read during last Query
+ *
+ * @param node the measurementMNode holding the lastCache, must be unary measurement
+ * @return the last cache value
+ */
+ public TimeValuePair getLastCache(IMeasurementMNode node) {
+ if (node.getSchema() instanceof VectorMeasurementSchema) {
+ throw new UnsupportedOperationException("Must provide subMeasurement for vector measurement");
+ }
+ return LastCacheManager.getLastCache(node.getPartialPath(), node);
+ }
+
+ /**
+ * Get the last cache value of given subMeasurement of given MeasurementMNode. Must be Vector
+ * case.
+ *
+ * <p>Invoking scenario: last cache read during last Query
+ *
+ * @param node the measurementMNode holding the lastCache
+ * @param subMeasurement the subMeasurement of aligned timeseries
+ * @return the last cache value
+ */
+ public TimeValuePair getLastCache(IMeasurementMNode node, String subMeasurement) {
+ if (!(node.getSchema() instanceof VectorMeasurementSchema)) {
+ throw new UnsupportedOperationException(
+ "Can't get lastCache of subMeasurement from unary measurement");
+ }
+ return LastCacheManager.getLastCache(node.getPartialPath().concatNode(subMeasurement), node);
+ }
+
+ /**
+ * Reset the last cache value of time series of given seriesPath. MManager will use the seriesPath
+ * to search the node.
+ *
+ * @param seriesPath the path from root to measurement of timeseries or subMeasurement of aligned
+ * timeseries
+ */
+ public void resetLastCache(PartialPath seriesPath) {
+ IMeasurementMNode node;
+ try {
+ node = (IMeasurementMNode) mtree.getNodeByPath(seriesPath);
+ } catch (MetadataException e) {
+ logger.warn("failed to reset last cache for the {}, err:{}", seriesPath, e.getMessage());
+ return;
+ }
+
+ LastCacheManager.resetLastCache(seriesPath, node);
+ }
+
+ /**
+ * delete all the last cache value of any timeseries or aligned timeseries under the device
+ *
+ * <p>Invoking scenario (1) after upload tsfile
+ *
+ * @param deviceId path of device
+ */
+ public void deleteLastCacheByDevice(PartialPath deviceId) throws MetadataException {
+ IMNode node = getDeviceNode(deviceId);
+ if (node.isEntity()) {
+ LastCacheManager.deleteLastCacheByDevice((IEntityMNode) node);
+ }
+ }
+
+ /**
+ * delete the last cache value of timeseries or subMeasurement of some aligned timeseries, which
+ * is under the device and matching the originalPath
+ *
+ * <p>Invoking scenario (1) delete timeseries
+ *
+ * @param deviceId path of device
+ * @param originalPath origin timeseries path
+ * @param startTime startTime
+ * @param endTime endTime
+ */
+ public void deleteLastCacheByDevice(
+ PartialPath deviceId, PartialPath originalPath, long startTime, long endTime)
+ throws MetadataException {
+ IMNode node = IoTDB.metaManager.getDeviceNode(deviceId);
+ if (node.isEntity()) {
+ LastCacheManager.deleteLastCacheByDevice(
+ (IEntityMNode) node, originalPath, startTime, endTime);
}
- return null;
}
/** get schema for device. Attention!!! Only support insertPlan */
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 755605f..e6d69f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.metadata;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -30,6 +29,7 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.MManager.StorageGroupFilter;
+import org.apache.iotdb.db.metadata.lastCache.LastCacheManager;
import org.apache.iotdb.db.metadata.logfile.MLogReader;
import org.apache.iotdb.db.metadata.logfile.MLogWriter;
import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
@@ -49,14 +49,11 @@ import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
-import org.apache.iotdb.db.query.executor.fill.LastPointReader;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -176,38 +173,6 @@ public class MTree implements Serializable {
}
}
- public static long getLastTimeStamp(IMeasurementMNode node, QueryContext queryContext) {
- TimeValuePair last = node.getCachedLast();
- if (last != null) {
- return node.getCachedLast().getTimestamp();
- } else {
- try {
- QueryDataSource dataSource =
- QueryResourceManager.getInstance()
- .getQueryDataSource(node.getPartialPath(), queryContext, null);
- Set<String> measurementSet = new HashSet<>();
- measurementSet.add(node.getPartialPath().getFullPath());
- LastPointReader lastReader =
- new LastPointReader(
- node.getPartialPath(),
- node.getSchema().getType(),
- measurementSet,
- queryContext,
- dataSource,
- Long.MAX_VALUE,
- null);
- last = lastReader.readLastPoint();
- return (last != null ? last.getTimestamp() : Long.MIN_VALUE);
- } catch (Exception e) {
- logger.error(
- "Something wrong happened while trying to get last time value pair of {}",
- node.getFullPath(),
- e);
- return Long.MIN_VALUE;
- }
- }
- }
-
private static String jsonToString(JsonObject jsonObject) {
return GSON.toJson(jsonObject);
}
@@ -1395,7 +1360,8 @@ public class MTree implements Serializable {
tsRow[5] = String.valueOf(((IMeasurementMNode) node).getOffset());
tsRow[6] =
needLast
- ? String.valueOf(getLastTimeStamp((IMeasurementMNode) node, queryContext))
+ ? String.valueOf(
+ LastCacheManager.getLastTimeStamp((IMeasurementMNode) node, queryContext))
: null;
Pair<PartialPath, String[]> temp = new Pair<>(nodePath, tsRow);
timeseriesSchemaList.add(temp);
@@ -1425,7 +1391,8 @@ public class MTree implements Serializable {
tsRow[5] = "-1";
tsRow[6] =
needLast
- ? String.valueOf(getLastTimeStamp((IMeasurementMNode) node, queryContext))
+ ? String.valueOf(
+ LastCacheManager.getLastTimeStamp((IMeasurementMNode) node, queryContext))
: null;
Pair<PartialPath, String[]> temp =
new Pair<>(new PartialPath(devicePath.getFullPath(), measurements.get(i)), tsRow);
@@ -1458,7 +1425,8 @@ public class MTree implements Serializable {
tsRow[5] = "-1";
tsRow[6] =
needLast
- ? String.valueOf(getLastTimeStamp((IMeasurementMNode) node, queryContext))
+ ? String.valueOf(
+ LastCacheManager.getLastTimeStamp((IMeasurementMNode) node, queryContext))
: null;
Pair<PartialPath, String[]> temp =
new Pair<>(new PartialPath(devicePath.getFullPath(), measurements.get(i)), tsRow);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java
new file mode 100644
index 0000000..031b140
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.lastCache;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
+import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.executor.fill.LastPointReader;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+// this class provides all the operations on last cache
+public class LastCacheManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(LastCacheManager.class);
+
+ /**
+ * get the last cache value of time series of given seriesPath
+ *
+ * @param seriesPath the path of timeseries or subMeasurement of aligned timeseries
+ * @param node the measurementMNode holding the lastCache When invoker only has the target
+ * seriesPath, the node could be null and MManager will search the node
+ * @return the last cache value
+ */
+ public static TimeValuePair getLastCache(PartialPath seriesPath, IMeasurementMNode node) {
+ if (node == null) {
+ return null;
+ }
+
+ checkIsTemplateLastCacheAndSetIfAbsent(node);
+
+ ILastCacheContainer lastCacheContainer = node.getLastCacheContainer();
+ if (seriesPath == null) {
+ return lastCacheContainer.getCachedLast();
+ } else {
+ String measurementId = seriesPath.getMeasurement();
+ if (measurementId.equals(node.getName()) || measurementId.equals(node.getAlias())) {
+ return lastCacheContainer.getCachedLast();
+ } else {
+ IMeasurementSchema schema = node.getSchema();
+ if (schema instanceof VectorMeasurementSchema) {
+ return lastCacheContainer.getCachedLast(
+ schema.getMeasurementIdColumnIndex(seriesPath.getMeasurement()));
+ }
+ return null;
+ }
+ }
+ }
+
+ /**
+ * update the last cache value of time series of given seriesPath
+ *
+ * @param seriesPath the path of timeseries or subMeasurement of aligned timeseries
+ * @param timeValuePair the latest point value
+ * @param highPriorityUpdate the last value from insertPlan is high priority
+ * @param latestFlushedTime latest flushed time
+ * @param node the measurementMNode holding the lastCache When invoker only has the target
+ * seriesPath, the node could be null and MManager will search the node
+ */
+ public static void updateLastCache(
+ PartialPath seriesPath,
+ TimeValuePair timeValuePair,
+ boolean highPriorityUpdate,
+ Long latestFlushedTime,
+ IMeasurementMNode node) {
+ if (node == null) {
+ return;
+ }
+
+ checkIsTemplateLastCacheAndSetIfAbsent(node);
+
+ ILastCacheContainer lastCacheContainer = node.getLastCacheContainer();
+ if (seriesPath == null) {
+ lastCacheContainer.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
+ } else {
+ String measurementId = seriesPath.getMeasurement();
+ if (measurementId.equals(node.getName()) || measurementId.equals(node.getAlias())) {
+ lastCacheContainer.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
+ } else {
+ IMeasurementSchema schema = node.getSchema();
+ if (schema instanceof VectorMeasurementSchema) {
+ if (lastCacheContainer.isEmpty()) {
+ lastCacheContainer.init(schema.getMeasurementCount());
+ }
+ lastCacheContainer.updateCachedLast(
+ schema.getMeasurementIdColumnIndex(seriesPath.getMeasurement()),
+ timeValuePair,
+ highPriorityUpdate,
+ latestFlushedTime);
+ }
+ }
+ }
+ }
+
+ /**
+ * reset the last cache value of time series of given seriesPath
+ *
+ * @param seriesPath the path of timeseries or subMeasurement of aligned timeseries
+ * @param node the measurementMNode holding the lastCache When invoker only has the target
+ * seriesPath, the node could be null and MManager will search the node
+ */
+ public static void resetLastCache(PartialPath seriesPath, IMeasurementMNode node) {
+ if (node == null) {
+ return;
+ }
+
+ checkIsTemplateLastCacheAndSetIfAbsent(node);
+
+ ILastCacheContainer lastCacheContainer = node.getLastCacheContainer();
+ if (seriesPath == null) {
+ lastCacheContainer.resetLastCache();
+ } else {
+ String measurementId = seriesPath.getMeasurement();
+ if (measurementId.equals(node.getName()) || measurementId.equals(node.getAlias())) {
+ lastCacheContainer.resetLastCache();
+ } else {
+ IMeasurementSchema schema = node.getSchema();
+ if (schema instanceof VectorMeasurementSchema) {
+ if (lastCacheContainer.isEmpty()) {
+ lastCacheContainer.init(schema.getMeasurementCount());
+ }
+ lastCacheContainer.resetLastCache(
+ schema.getMeasurementIdColumnIndex(seriesPath.getMeasurement()));
+ }
+ }
+ }
+ }
+
+ private static void checkIsTemplateLastCacheAndSetIfAbsent(IMeasurementMNode node) {
+ IEntityMNode entityMNode = node.getParent();
+ if (entityMNode == null) {
+ // cluster cached remote measurementMNode doesn't have parent
+ return;
+ }
+ String measurement = node.getName();
+
+ // if entityMNode doesn't have this child, the child is derived from template
+ if (!entityMNode.hasChild(measurement)) {
+ ILastCacheContainer lastCacheContainer = entityMNode.getLastCacheContainer(measurement);
+ IMeasurementSchema schema = node.getSchema();
+ if (lastCacheContainer.isEmpty() && (schema instanceof VectorMeasurementSchema)) {
+ lastCacheContainer.init(schema.getMeasurementCount());
+ }
+ node.setLastCacheContainer(lastCacheContainer);
+ }
+ }
+
+ /**
+ * delete all the last cache value of any timeseries or aligned timeseries under the entity
+ *
+ * @param node entity node
+ */
+ public static void deleteLastCacheByDevice(IEntityMNode node) {
+ // process lastCache of timeseries represented by measurementNode
+ for (IMNode measurementNode : node.getChildren().values()) {
+ if (measurementNode != null) {
+ ((IMeasurementMNode) measurementNode).getLastCacheContainer().resetLastCache();
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "[tryToDeleteLastCacheByDevice] Last cache for path: {} is set to null",
+ measurementNode.getFullPath());
+ }
+ }
+ }
+ // process lastCache of timeseries represented by template
+ for (Map.Entry<String, ILastCacheContainer> entry : node.getTemplateLastCaches().entrySet()) {
+ entry.getValue().resetLastCache();
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "[tryToDeleteLastCacheByDevice] Last cache for path: {} is set to null",
+ node.getPartialPath().concatNode(entry.getKey()).getFullPath());
+ }
+ }
+ }
+
+ /**
+ * delete the last cache value of timeseries or subMeasurement of some aligned timeseries, which
+ * is under the entity and matching the originalPath
+ *
+ * @param node entity node
+ * @param originalPath origin timeseries path
+ * @param startTime startTime
+ * @param endTime endTime
+ */
+ public static void deleteLastCacheByDevice(
+ IEntityMNode node, PartialPath originalPath, long startTime, long endTime) {
+ PartialPath path;
+ IMeasurementSchema schema;
+ ILastCacheContainer lastCacheContainer;
+
+ // process lastCache of timeseries represented by measurementNode
+ IMeasurementMNode measurementMNode;
+ for (IMNode child : node.getChildren().values()) {
+ if (child == null || !child.isMeasurement()) {
+ continue;
+ }
+ path = child.getPartialPath();
+ measurementMNode = (IMeasurementMNode) child;
+ if (originalPath.matchFullPath(path)) {
+ lastCacheContainer = measurementMNode.getLastCacheContainer();
+ if (lastCacheContainer == null) {
+ continue;
+ }
+ schema = measurementMNode.getSchema();
+ deleteLastCache(path, schema, lastCacheContainer, startTime, endTime);
+ }
+ }
+
+ // process lastCache of timeseries represented by template
+ Template template = node.getUpperTemplate();
+ for (Map.Entry<String, ILastCacheContainer> entry : node.getTemplateLastCaches().entrySet()) {
+ path = node.getPartialPath().concatNode(entry.getKey());
+ if (originalPath.matchFullPath(path)) {
+ lastCacheContainer = entry.getValue();
+ if (lastCacheContainer == null) {
+ continue;
+ }
+ schema = template.getSchemaMap().get(entry.getKey());
+ deleteLastCache(path, schema, lastCacheContainer, startTime, endTime);
+ }
+ }
+ }
+
+ private static void deleteLastCache(
+ PartialPath path,
+ IMeasurementSchema schema,
+ ILastCacheContainer lastCacheContainer,
+ long startTime,
+ long endTime) {
+ TimeValuePair lastPair;
+ if (schema instanceof VectorMeasurementSchema) {
+ int index;
+ for (String measurement : schema.getValueMeasurementIdList()) {
+ index = schema.getMeasurementIdColumnIndex(measurement);
+ lastPair = lastCacheContainer.getCachedLast(index);
+ if (lastPair != null
+ && startTime <= lastPair.getTimestamp()
+ && lastPair.getTimestamp() <= endTime) {
+ lastCacheContainer.resetLastCache(index);
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "[tryToDeleteLastCache] Last cache for path: {} is set to null",
+ path.concatNode(measurement).getFullPath());
+ }
+ }
+ }
+ } else {
+ lastPair = lastCacheContainer.getCachedLast();
+ if (lastPair != null
+ && startTime <= lastPair.getTimestamp()
+ && lastPair.getTimestamp() <= endTime) {
+ lastCacheContainer.resetLastCache();
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "[tryToDeleteLastCache] Last cache for path: {} is set to null", path.getFullPath());
+ }
+ }
+ }
+ }
+
+ /**
+ * get the last value of timeseries represented by given measurementMNode get last value from
+ * cache in measurementMNode if absent, get last value from file
+ *
+ * @param node measurementMNode representing the target timeseries
+ * @param queryContext query context
+ * @return the last value
+ */
+ public static long getLastTimeStamp(IMeasurementMNode node, QueryContext queryContext) {
+ TimeValuePair last = getLastCache(null, node);
+ if (last != null) {
+ return getLastCache(null, node).getTimestamp();
+ } else {
+ try {
+ QueryDataSource dataSource =
+ QueryResourceManager.getInstance()
+ .getQueryDataSource(node.getPartialPath(), queryContext, null);
+ Set<String> measurementSet = new HashSet<>();
+ measurementSet.add(node.getPartialPath().getFullPath());
+ LastPointReader lastReader =
+ new LastPointReader(
+ node.getPartialPath(),
+ node.getSchema().getType(),
+ measurementSet,
+ queryContext,
+ dataSource,
+ Long.MAX_VALUE,
+ null);
+ last = lastReader.readLastPoint();
+ return (last != null ? last.getTimestamp() : Long.MIN_VALUE);
+ } catch (Exception e) {
+ logger.error(
+ "Something wrong happened while trying to get last time value pair of {}",
+ node.getFullPath(),
+ e);
+ return Long.MIN_VALUE;
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/ILastCacheContainer.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java
copy to server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/ILastCacheContainer.java
index 6a4d9f0..3767b51 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/ILastCacheContainer.java
@@ -16,43 +16,43 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.metadata.mnode;
-import org.apache.iotdb.db.engine.trigger.executor.TriggerExecutor;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-
-/** This interface defines a MeasurementMNode's operation interfaces. */
-public interface IMeasurementMNode extends IMNode {
-
- @Override
- IEntityMNode getParent();
-
- IMeasurementSchema getSchema();
+package org.apache.iotdb.db.metadata.lastCache.container;
- void setSchema(IMeasurementSchema schema);
-
- TSDataType getDataType(String measurementId);
-
- int getMeasurementCount();
+import org.apache.iotdb.tsfile.read.TimeValuePair;
- String getAlias();
+/** this interface declares the operations of LastCache data */
+public interface ILastCacheContainer {
- void setAlias(String alias);
+ // if vector, entry need schema size to init LastCache Value list
+ void init(int size);
- long getOffset();
+ // get lastCache of monad timseries
+ TimeValuePair getCachedLast();
- void setOffset(long offset);
+ // get lastCache of vector timseries
+ TimeValuePair getCachedLast(int index);
- TriggerExecutor getTriggerExecutor();
+ /**
+ * update last point cache
+ *
+ * @param timeValuePair last point
+ * @param highPriorityUpdate whether it's a high priority update
+ * @param latestFlushedTime latest flushed time
+ */
+ void updateCachedLast(
+ TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime);
- void setTriggerExecutor(TriggerExecutor triggerExecutor);
+ // update lastCache for vector timseries
+ void updateCachedLast(
+ int index, TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime);
- TimeValuePair getCachedLast();
+ // reset all lastCache data of one timeseries(monad or vector)
+ void resetLastCache();
- void updateCachedLast(
- TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime);
+ // reset lastCache of vector's subsensor
+ void resetLastCache(int index);
- void resetCache();
+ // whether the entry contains lastCache Value.
+ boolean isEmpty();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/LastCacheContainer.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/LastCacheContainer.java
new file mode 100644
index 0000000..8ba9a99
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/LastCacheContainer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.lastCache.container;
+
+import org.apache.iotdb.db.metadata.lastCache.container.value.ILastCacheValue;
+import org.apache.iotdb.db.metadata.lastCache.container.value.UnaryLastCacheValue;
+import org.apache.iotdb.db.metadata.lastCache.container.value.VectorLastCacheValue;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+
+/**
+ * This class possesses the ILastCacheValue and implements the basic last cache operations.
+ *
+ * <p>The ILastCacheValue may be extended to ILastCacheValue List in future to support batched last
+ * value cache.
+ */
+public class LastCacheContainer implements ILastCacheContainer {
+
+ ILastCacheValue lastCacheValue;
+
+ @Override
+ public void init(int size) {
+ if (size > 1) {
+ lastCacheValue = new VectorLastCacheValue(size);
+ }
+ }
+
+ @Override
+ public TimeValuePair getCachedLast() {
+ return lastCacheValue == null ? null : lastCacheValue.getTimeValuePair();
+ }
+
+ @Override
+ public TimeValuePair getCachedLast(int index) {
+ return lastCacheValue == null ? null : lastCacheValue.getTimeValuePair(index);
+ }
+
+ @Override
+ public synchronized void updateCachedLast(
+ TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) {
+ if (timeValuePair == null || timeValuePair.getValue() == null) {
+ return;
+ }
+
+ if (lastCacheValue == null) {
+ // If no cached last, (1) a last query (2) an unseq insertion or (3) a seq insertion will
+ // update cache.
+ if (!highPriorityUpdate || latestFlushedTime <= timeValuePair.getTimestamp()) {
+ lastCacheValue =
+ new UnaryLastCacheValue(timeValuePair.getTimestamp(), timeValuePair.getValue());
+ }
+ } else if (timeValuePair.getTimestamp() > lastCacheValue.getTimestamp()
+ || (timeValuePair.getTimestamp() == lastCacheValue.getTimestamp() && highPriorityUpdate)) {
+ lastCacheValue.setTimestamp(timeValuePair.getTimestamp());
+ lastCacheValue.setValue(timeValuePair.getValue());
+ }
+ }
+
+ @Override
+ public synchronized void updateCachedLast(
+ int index, TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) {
+ if (timeValuePair == null || timeValuePair.getValue() == null) {
+ return;
+ }
+
+ if (lastCacheValue.getTimeValuePair(index) == null) {
+ // If no cached last, (1) a last query (2) an unseq insertion or (3) a seq insertion will
+ // update cache.
+ if (!highPriorityUpdate || latestFlushedTime <= timeValuePair.getTimestamp()) {
+ lastCacheValue.setTimestamp(index, timeValuePair.getTimestamp());
+ lastCacheValue.setValue(index, timeValuePair.getValue());
+ }
+ } else if (timeValuePair.getTimestamp() > lastCacheValue.getTimestamp(index)) {
+ lastCacheValue.setTimestamp(index, timeValuePair.getTimestamp());
+ lastCacheValue.setValue(index, timeValuePair.getValue());
+ } else if (timeValuePair.getTimestamp() == lastCacheValue.getTimestamp(index)) {
+ if (highPriorityUpdate || lastCacheValue.getValue(index) == null) {
+ lastCacheValue.setTimestamp(index, timeValuePair.getTimestamp());
+ lastCacheValue.setValue(index, timeValuePair.getValue());
+ }
+ }
+ }
+
+ @Override
+ public synchronized void resetLastCache() {
+ lastCacheValue = null;
+ }
+
+ @Override
+ public void resetLastCache(int index) {
+ if (lastCacheValue instanceof VectorLastCacheValue) {
+ lastCacheValue.setValue(index, null);
+ } else {
+ lastCacheValue = null;
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return lastCacheValue == null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/ILastCacheValue.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/ILastCacheValue.java
new file mode 100644
index 0000000..e4fc716
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/ILastCacheValue.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.lastCache.container.value;
+
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+// this interface declares the simplest storage operation of lastCacheValue
+public interface ILastCacheValue {
+
+ long getTimestamp();
+
+ void setTimestamp(long timestamp);
+
+ void setValue(TsPrimitiveType value);
+
+ TimeValuePair getTimeValuePair();
+
+ int getSize();
+
+ long getTimestamp(int index);
+
+ void setTimestamp(int index, long timestamp);
+
+ TsPrimitiveType getValue(int index);
+
+ void setValue(int index, TsPrimitiveType value);
+
+ TimeValuePair getTimeValuePair(int index);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/UnaryLastCacheValue.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/UnaryLastCacheValue.java
new file mode 100644
index 0000000..b8f21ce
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/UnaryLastCacheValue.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.lastCache.container.value;
+
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public class UnaryLastCacheValue implements ILastCacheValue {
+
+ private static final String INDEX_OPERATION_ON_MONAD_EXCEPTION =
+ "Cannot operate data on any index but 0 on MonadLastCacheValue";
+
+ private long timestamp;
+
+ private TsPrimitiveType value;
+
+ public UnaryLastCacheValue(long timestamp, TsPrimitiveType value) {
+ this.timestamp = timestamp;
+ this.value = value;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public void setValue(TsPrimitiveType value) {
+ this.value = value;
+ }
+
+ @Override
+ public TimeValuePair getTimeValuePair() {
+ return new TimeValuePair(timestamp, value);
+ }
+
+ @Override
+ public int getSize() {
+ return 1;
+ }
+
+ @Override
+ public long getTimestamp(int index) {
+ if (index == 0) {
+ return timestamp;
+ }
+ throw new RuntimeException(INDEX_OPERATION_ON_MONAD_EXCEPTION);
+ }
+
+ @Override
+ public void setTimestamp(int index, long timestamp) {
+ if (index == 0) {
+ this.timestamp = timestamp;
+ }
+ throw new RuntimeException(INDEX_OPERATION_ON_MONAD_EXCEPTION);
+ }
+
+ @Override
+ public TsPrimitiveType getValue(int index) {
+ if (index == 0) {
+ return value;
+ }
+ throw new RuntimeException(INDEX_OPERATION_ON_MONAD_EXCEPTION);
+ }
+
+ @Override
+ public void setValue(int index, TsPrimitiveType value) {
+ if (index == 0) {
+ this.value = value;
+ }
+ throw new RuntimeException(INDEX_OPERATION_ON_MONAD_EXCEPTION);
+ }
+
+ @Override
+ public TimeValuePair getTimeValuePair(int index) {
+ if (index != 0) {
+ throw new RuntimeException(INDEX_OPERATION_ON_MONAD_EXCEPTION);
+ } else if (value == null) {
+ return null;
+ } else {
+ return new TimeValuePair(timestamp, value);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/VectorLastCacheValue.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/VectorLastCacheValue.java
new file mode 100644
index 0000000..d5a3ada
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/VectorLastCacheValue.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.lastCache.container.value;
+
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+// this class defines the storage of vector lastCache data
+public class VectorLastCacheValue implements ILastCacheValue {
+
+ // the last point data of different subSensors may vary from each other on timestamp
+ private long[] timestamps;
+
+ private TsPrimitiveType[] values;
+
+ public VectorLastCacheValue(int size) {
+ timestamps = new long[size];
+ values = new TsPrimitiveType[size];
+ }
+
+ @Override
+ public int getSize() {
+ return values.length;
+ }
+
+ @Override
+ public long getTimestamp(int index) {
+ return timestamps[index];
+ }
+
+ @Override
+ public void setTimestamp(int index, long timestamp) {
+ timestamps[index] = timestamp;
+ }
+
+ @Override
+ public TsPrimitiveType getValue(int index) {
+ return values == null ? null : values[index];
+ }
+
+ @Override
+ public void setValue(int index, TsPrimitiveType value) {
+ values[index] = value;
+ }
+
+ @Override
+ public TimeValuePair getTimeValuePair(int index) {
+ if (values == null || index < 0 || index >= values.length || values[index] == null) {
+ return null;
+ }
+ return new TimeValuePair(timestamps[index], values[index]);
+ }
+
+ @Override
+ public long getTimestamp() {
+ return 0;
+ }
+
+ @Override
+ public void setTimestamp(long timestamp) {}
+
+ @Override
+ public void setValue(TsPrimitiveType value) {}
+
+ @Override
+ public TimeValuePair getTimeValuePair() {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/EntityMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/EntityMNode.java
index 93c3791..11e7f91 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/EntityMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/EntityMNode.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.db.metadata.mnode;
+import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
+import org.apache.iotdb.db.metadata.lastCache.container.LastCacheContainer;
+
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,6 +37,8 @@ public class EntityMNode extends InternalMNode implements IEntityMNode {
private volatile boolean useTemplate = false;
+ private volatile Map<String, ILastCacheContainer> lastCacheMap = null;
+
/**
* Constructor of MNode.
*
@@ -110,6 +115,26 @@ public class EntityMNode extends InternalMNode implements IEntityMNode {
this.useTemplate = useTemplate;
}
+ public ILastCacheContainer getLastCacheContainer(String measurementId) {
+ checkLastCacheMap();
+ return lastCacheMap.computeIfAbsent(measurementId, k -> new LastCacheContainer());
+ }
+
+ @Override
+ public Map<String, ILastCacheContainer> getTemplateLastCaches() {
+ return lastCacheMap == null ? Collections.emptyMap() : lastCacheMap;
+ }
+
+ private void checkLastCacheMap() {
+ if (lastCacheMap == null) {
+ synchronized (this) {
+ if (lastCacheMap == null) {
+ lastCacheMap = new ConcurrentHashMap<>();
+ }
+ }
+ }
+ }
+
@Override
public boolean isEntity() {
return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IEntityMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IEntityMNode.java
index 5b1369a..98368e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IEntityMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IEntityMNode.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.metadata.mnode;
+import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
+
import java.util.Map;
public interface IEntityMNode extends IMNode {
@@ -34,6 +36,10 @@ public interface IEntityMNode extends IMNode {
void setUseTemplate(boolean useTemplate);
+ ILastCacheContainer getLastCacheContainer(String measurementId);
+
+ Map<String, ILastCacheContainer> getTemplateLastCaches();
+
static IEntityMNode setToEntity(IMNode node) {
IEntityMNode entityMNode;
if (node.isEntity()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java
index 6a4d9f0..2f7ae73 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.metadata.mnode;
import org.apache.iotdb.db.engine.trigger.executor.TriggerExecutor;
+import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
/** This interface defines a MeasurementMNode's operation interfaces. */
@@ -49,10 +49,7 @@ public interface IMeasurementMNode extends IMNode {
void setTriggerExecutor(TriggerExecutor triggerExecutor);
- TimeValuePair getCachedLast();
+ ILastCacheContainer getLastCacheContainer();
- void updateCachedLast(
- TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime);
-
- void resetCache();
+ void setLastCacheContainer(ILastCacheContainer lastCacheContainer);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
index 27480e8..1f9430a 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
@@ -20,13 +20,14 @@ package org.apache.iotdb.db.metadata.mnode;
import org.apache.iotdb.db.engine.trigger.executor.TriggerExecutor;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
+import org.apache.iotdb.db.metadata.lastCache.container.LastCacheContainer;
import org.apache.iotdb.db.metadata.logfile.MLogWriter;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -55,7 +56,7 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
private long offset = -1;
/** last value cache */
- private TimeValuePair cachedLastValuePair = null;
+ private volatile ILastCacheContainer lastCacheContainer = null;
/** registered trigger */
private TriggerExecutor triggerExecutor = null;
@@ -153,42 +154,20 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
}
@Override
- public TimeValuePair getCachedLast() {
- return cachedLastValuePair;
- }
-
- /**
- * update last point cache
- *
- * @param timeValuePair last point
- * @param highPriorityUpdate whether it's a high priority update
- * @param latestFlushedTime latest flushed time
- */
- @Override
- public synchronized void updateCachedLast(
- TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) {
- if (timeValuePair == null || timeValuePair.getValue() == null) {
- return;
- }
-
- if (cachedLastValuePair == null) {
- // If no cached last, (1) a last query (2) an unseq insertion or (3) a seq insertion will
- // update cache.
- if (!highPriorityUpdate || latestFlushedTime <= timeValuePair.getTimestamp()) {
- cachedLastValuePair =
- new TimeValuePair(timeValuePair.getTimestamp(), timeValuePair.getValue());
+ public ILastCacheContainer getLastCacheContainer() {
+ if (lastCacheContainer == null) {
+ synchronized (this) {
+ if (lastCacheContainer == null) {
+ lastCacheContainer = new LastCacheContainer();
+ }
}
- } else if (timeValuePair.getTimestamp() > cachedLastValuePair.getTimestamp()
- || (timeValuePair.getTimestamp() == cachedLastValuePair.getTimestamp()
- && highPriorityUpdate)) {
- cachedLastValuePair.setTimestamp(timeValuePair.getTimestamp());
- cachedLastValuePair.setValue(timeValuePair.getValue());
}
+ return lastCacheContainer;
}
@Override
- public void resetCache() {
- cachedLastValuePair = null;
+ public void setLastCacheContainer(ILastCacheContainer lastCacheContainer) {
+ this.lastCacheContainer = lastCacheContainer;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
index e9f9b2e..ba6fa8e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.metadata.MTree;
import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.lastCache.LastCacheManager;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
@@ -152,7 +152,8 @@ public class TagManager {
allMatchedNodes.stream()
.sorted(
Comparator.comparingLong(
- (IMeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, context))
+ (IMeasurementMNode mNode) ->
+ LastCacheManager.getLastTimeStamp(mNode, context))
.reversed()
.thenComparing(IMNode::getFullPath))
.collect(toList());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 0c1c19c..ed9888b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
@@ -270,7 +271,17 @@ public class LastQueryExecutor {
try {
node = (IMeasurementMNode) IoTDB.metaManager.getNodeByPath(path);
} catch (MetadataException e) {
- TimeValuePair timeValuePair = IoTDB.metaManager.getLastCache(path);
+ TimeValuePair timeValuePair;
+ // cluster mode may not get remote node
+ if (path instanceof VectorPartialPath) {
+ // the seriesPath has been transformed to vector path
+ // here needs subSensor path
+ timeValuePair =
+ IoTDB.metaManager.getLastCache(
+ ((VectorPartialPath) path).getSubSensorsPathList().get(0));
+ } else {
+ timeValuePair = IoTDB.metaManager.getLastCache(path);
+ }
if (timeValuePair != null) {
return timeValuePair;
}
@@ -279,11 +290,34 @@ public class LastQueryExecutor {
if (node == null) {
return null;
}
- return node.getCachedLast();
+
+ if (path instanceof VectorPartialPath) {
+ // the seriesPath has been transformed to vector path
+ // here needs subSensor path
+ return IoTDB.metaManager.getLastCache(
+ node, ((VectorPartialPath) path).getSubSensorsPathList().get(0).getMeasurement());
+ } else {
+ return IoTDB.metaManager.getLastCache(node);
+ }
}
public void write(TimeValuePair pair) {
- IoTDB.metaManager.updateLastCache(path, pair, false, Long.MIN_VALUE, node);
+ if (node == null) {
+ IoTDB.metaManager.updateLastCache(path, pair, false, Long.MIN_VALUE);
+ } else {
+ if (path instanceof VectorPartialPath) {
+ // the seriesPath has been transformed to vector path
+ // here needs subSensor path
+ IoTDB.metaManager.updateLastCache(
+ node,
+ ((VectorPartialPath) path).getSubSensorsPathList().get(0).getMeasurement(),
+ pair,
+ false,
+ Long.MIN_VALUE);
+ } else {
+ IoTDB.metaManager.updateLastCache(node, pair, false, Long.MIN_VALUE);
+ }
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 71db7cb..9b84d52 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -162,7 +162,7 @@ public class TTLTest {
plan.setMeasurementMNodes(
new IMeasurementMNode[] {
new MeasurementMNode(
- null, null, new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null)
+ null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null)
});
plan.transferType();
@@ -195,7 +195,7 @@ public class TTLTest {
plan.setMeasurementMNodes(
new IMeasurementMNode[] {
new MeasurementMNode(
- null, null, new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null)
+ null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null)
});
plan.transferType();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java
index 234e0d7..e30e923 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java
@@ -20,8 +20,6 @@ package org.apache.iotdb.db.integration;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.mnode.IMNode;
-import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
@@ -225,16 +223,14 @@ public class IoTDBLastIT {
}
}
- IMeasurementMNode node =
- (IMeasurementMNode)
- IoTDB.metaManager.getNodeByPath(new PartialPath("root.ln.wf01.wt01.temperature"));
- node.resetCache();
+ PartialPath path = new PartialPath("root.ln.wf01.wt01.temperature");
+ IoTDB.metaManager.resetLastCache(path);
statement.execute(
"insert into root.ln.wf01.wt01(time, temperature, status, id) values(700, 33.1, false, 3)");
// Last cache is updated with above insert sql
- long time = node.getCachedLast().getTimestamp();
+ long time = IoTDB.metaManager.getLastCache(path).getTimestamp();
Assert.assertEquals(700, time);
hasResultSet = statement.execute("select last temperature,status,id from root.ln.wf01.wt01");
@@ -258,7 +254,7 @@ public class IoTDBLastIT {
"insert into root.ln.wf01.wt01(time, temperature, status, id) values(600, 19.1, false, 1)");
// Last cache is not updated with above insert sql
- time = node.getCachedLast().getTimestamp();
+ time = IoTDB.metaManager.getLastCache(path).getTimestamp();
Assert.assertEquals(700, time);
hasResultSet = statement.execute("select last temperature,status,id from root.ln.wf01.wt01");
@@ -297,9 +293,9 @@ public class IoTDBLastIT {
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
- IMNode node =
- IoTDB.metaManager.getNodeByPath(new PartialPath("root.ln.wf01.wt02.temperature"));
- ((IMeasurementMNode) node).resetCache();
+ PartialPath path = new PartialPath("root.ln.wf01.wt02.temperature");
+ IoTDB.metaManager.resetLastCache(path);
+
boolean hasResultSet =
statement.execute("select last temperature,status,id from root.ln.wf01.wt02");
@@ -343,7 +339,7 @@ public class IoTDBLastIT {
}
Assert.assertEquals(cnt, retArray.length);
- ((IMeasurementMNode) node).resetCache();
+ IoTDB.metaManager.resetLastCache(path);
String[] retArray3 =
new String[] {
"900,root.ln.wf01.wt01.temperature,10.2,DOUBLE",
@@ -387,9 +383,7 @@ public class IoTDBLastIT {
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
- IMNode node =
- IoTDB.metaManager.getNodeByPath(new PartialPath("root.ln.wf01.wt03.temperature"));
- ((IMeasurementMNode) node).resetCache();
+ IoTDB.metaManager.resetLastCache(new PartialPath("root.ln.wf01.wt03.temperature"));
statement.execute(
"INSERT INTO root.ln.wf01.wt03(timestamp,status, id) values(500, false, 9)");
@@ -438,9 +432,7 @@ public class IoTDBLastIT {
statement.execute("INSERT INTO root.ln.wf01.wt04(timestamp,temperature) values(150,31.2)");
statement.execute("flush");
- IMNode node =
- IoTDB.metaManager.getNodeByPath(new PartialPath("root.ln.wf01.wt04.temperature"));
- ((IMeasurementMNode) node).resetCache();
+ IoTDB.metaManager.resetLastCache(new PartialPath("root.ln.wf01.wt04.temperature"));
boolean hasResultSet = statement.execute("select last temperature from root.ln.wf01.wt04");
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
index 21e3c01..b31cbcc 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.metadata;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.lastCache.LastCacheManager;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.service.IoTDB;
@@ -218,13 +219,12 @@ public class MManagerAdvancedTest {
TimeValuePair tv1 = new TimeValuePair(1000, TsPrimitiveType.getByType(TSDataType.DOUBLE, 1.0));
TimeValuePair tv2 = new TimeValuePair(2000, TsPrimitiveType.getByType(TSDataType.DOUBLE, 3.0));
TimeValuePair tv3 = new TimeValuePair(1500, TsPrimitiveType.getByType(TSDataType.DOUBLE, 2.5));
- IMNode node = mmanager.getNodeByPath(new PartialPath("root.vehicle.d2.s0"));
- ((IMeasurementMNode) node).updateCachedLast(tv1, true, Long.MIN_VALUE);
- ((IMeasurementMNode) node).updateCachedLast(tv2, true, Long.MIN_VALUE);
- Assert.assertEquals(
- tv2.getTimestamp(), ((IMeasurementMNode) node).getCachedLast().getTimestamp());
- ((IMeasurementMNode) node).updateCachedLast(tv3, true, Long.MIN_VALUE);
- Assert.assertEquals(
- tv2.getTimestamp(), ((IMeasurementMNode) node).getCachedLast().getTimestamp());
+ PartialPath path = new PartialPath("root.vehicle.d2.s0");
+ IMeasurementMNode node = (IMeasurementMNode) mmanager.getNodeByPath(path);
+ LastCacheManager.updateLastCache(path, tv1, true, Long.MIN_VALUE, node);
+ LastCacheManager.updateLastCache(path, tv2, true, Long.MIN_VALUE, node);
+ Assert.assertEquals(tv2.getTimestamp(), mmanager.getLastCache(node).getTimestamp());
+ LastCacheManager.updateLastCache(path, tv3, true, Long.MIN_VALUE, node);
+ Assert.assertEquals(tv2.getTimestamp(), mmanager.getLastCache(node).getTimestamp());
}
}