You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2020/12/31 10:22:58 UTC
[iotdb] 01/01: fix #2256
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch fix_sync_last_query
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5cf1ee6bc32a67bf66c71d0b179c11e9b4eacb30
Author: lta <li...@163.com>
AuthorDate: Thu Dec 31 18:21:22 2020 +0800
fix #2256
---
.../engine/storagegroup/StorageGroupProcessor.java | 38 ++++++++++++++++++++++
.../db/sync/receiver/transfer/SyncServiceImpl.java | 4 +--
2 files changed, 40 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a05ff59..48da88f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -1835,18 +1836,51 @@ public class StorageGroupProcessor {
newFilePartitionId)) {
updateLatestTimeMap(newTsFileResource);
}
+ resetLastCacheWhenLoadingTsfile(newTsFileResource);
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
throw new LoadFileException(e);
+ } catch (IllegalPathException | WriteProcessException e) {
+ logger.error("Failed to reset last cache when loading file {}", newTsFileResource.getTsFilePath(), e);
+ throw new LoadFileException(e);
} finally {
tsFileManagement.writeUnlock();
writeUnlock();
}
}
+ private void resetLastCacheWhenLoadingTsfile(TsFileResource newTsFileResource)
+ throws IllegalPathException, WriteProcessException {
+ for (Entry<String, Integer> entry : newTsFileResource.getDeviceToIndexMap().entrySet()) {
+ PartialPath device = new PartialPath(entry.getKey());
+ int index = entry.getValue();
+ tryToDeleteLastCacheByDevice(device);
+ }
+ }
+
+ private void tryToDeleteLastCacheByDevice(PartialPath deviceId) throws WriteProcessException {
+ if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
+ return;
+ }
+ try {
+ MNode node = IoTDB.metaManager.getDeviceNode(deviceId);
+
+ for (MNode measurementNode : node.getChildren().values()) {
+ if (measurementNode != null) {
+ TimeValuePair lastPair = ((MeasurementMNode) measurementNode).getCachedLast();
+ if (lastPair != null) {
+ ((MeasurementMNode) measurementNode).resetCache();
+ }
+ }
+ }
+ } catch (MetadataException e) {
+ throw new WriteProcessException(e);
+ }
+ }
+
/**
* Load a new tsfile to storage group processor. Tne file may have overlap with other files. <p>
* that there has no file which is overlapping with the new file.
@@ -1895,6 +1929,7 @@ public class StorageGroupProcessor {
loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
newFilePartitionId);
}
+ resetLastCacheWhenLoadingTsfile(newTsFileResource);
// update latest time map
updateLatestTimeMap(newTsFileResource);
@@ -1904,6 +1939,9 @@ public class StorageGroupProcessor {
tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
throw new LoadFileException(e);
+ } catch (IllegalPathException | WriteProcessException e) {
+ logger.error("Failed to reset last cache when loading file {}", newTsFileResource.getTsFilePath(), e);
+ throw new LoadFileException(e);
} finally {
tsFileManagement.writeUnlock();
writeUnlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index ec71871..4150039 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -81,8 +81,8 @@ public class SyncServiceImpl implements SyncService.Iface {
String ipAddress = info.address, uuid = info.uuid;
Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
if (!info.version.equals(IoTDBConstant.VERSION)) {
- return getErrorResult(String.format("Version mismatch: the sender <%s>, the receiver <%s>",
- info.version, IoTDBConstant.VERSION));
+// return getErrorResult(String.format("Version mismatch: the sender <%s>, the receiver <%s>",
+// info.version, IoTDBConstant.VERSION));
}
if (info.partitionInterval != IoTDBDescriptor.getInstance().getConfig()
.getPartitionInterval()) {