You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/26 05:16:52 UTC
[incubator-iotdb] 01/01: try to fix last cache
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_tablet_last
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit effcddce34e34149e745538828902eed33cb4a88
Author: qiaojialin <64...@qq.com>
AuthorDate: Sun Apr 26 13:16:33 2020 +0800
try to fix last cache
---
.../db/engine/storagegroup/StorageGroupProcessor.java | 7 ++++---
.../iotdb/db/qp/physical/crud/InsertTabletPlan.java | 16 ++++++++--------
2 files changed, 12 insertions(+), 11 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 803e147..74bfa63 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -575,6 +575,7 @@ public class StorageGroupProcessor {
return;
}
+ logger.info("@+++<<<: current batch start {} end {}", start, end);
TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
if (tsFileProcessor == null) {
for (int i = start; i < end; i++) {
@@ -601,7 +602,7 @@ public class StorageGroupProcessor {
}
long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
insertTabletPlan.getDeviceId(), Long.MIN_VALUE);
- tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);
+ tryToUpdateBatchInsertLastCache(insertTabletPlan, end -1, globalLatestFlushedTime);
// check memtable size and may async try to flush the work memtable
if (tsFileProcessor.shouldFlush()) {
@@ -609,7 +610,7 @@ public class StorageGroupProcessor {
}
}
- public void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime)
+ public void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, int lastIndex, Long latestFlushedTime)
throws WriteProcessException {
MNode node = null;
try {
@@ -619,7 +620,7 @@ public class StorageGroupProcessor {
// Update cached last value with high priority
MNode measurementNode = node.getChild(measurementList[i]);
((LeafMNode) measurementNode)
- .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
+ .updateCachedLast(plan.composeLastTimeValuePair(i, lastIndex), true, latestFlushedTime);
}
} catch (MetadataException e) {
throw new WriteProcessException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index a26ac3b..65abe11 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -417,7 +417,7 @@ public class InsertTabletPlan extends PhysicalPlan {
return tmpMaxTime;
}
- public TimeValuePair composeLastTimeValuePair(int measurementIndex) {
+ public TimeValuePair composeLastTimeValuePair(int measurementIndex, int lastIndex) {
if (measurementIndex >= columns.length) {
return null;
}
@@ -425,33 +425,33 @@ public class InsertTabletPlan extends PhysicalPlan {
switch (dataTypes[measurementIndex]) {
case INT32:
int[] intValues = (int[]) columns[measurementIndex];
- value = new TsInt(intValues[end - 1]);
+ value = new TsInt(intValues[lastIndex]);
break;
case INT64:
long[] longValues = (long[]) columns[measurementIndex];
- value = new TsLong(longValues[end - 1]);
+ value = new TsLong(longValues[lastIndex]);
break;
case FLOAT:
float[] floatValues = (float[]) columns[measurementIndex];
- value = new TsFloat(floatValues[end - 1]);
+ value = new TsFloat(floatValues[lastIndex]);
break;
case DOUBLE:
double[] doubleValues = (double[]) columns[measurementIndex];
- value = new TsDouble(doubleValues[end - 1]);
+ value = new TsDouble(doubleValues[lastIndex]);
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) columns[measurementIndex];
- value = new TsBoolean(boolValues[end - 1]);
+ value = new TsBoolean(boolValues[lastIndex]);
break;
case TEXT:
Binary[] binaryValues = (Binary[]) columns[measurementIndex];
- value = new TsBinary(binaryValues[end - 1]);
+ value = new TsBinary(binaryValues[lastIndex]);
break;
default:
throw new UnSupportedDataTypeException(
String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
}
- return new TimeValuePair(times[end - 1], value);
+ return new TimeValuePair(times[lastIndex], value);
}
public long[] getTimes() {