You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2020/12/31 10:22:58 UTC

[iotdb] 01/01: fix #2256

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch fix_sync_last_query
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5cf1ee6bc32a67bf66c71d0b179c11e9b4eacb30
Author: lta <li...@163.com>
AuthorDate: Thu Dec 31 18:21:22 2020 +0800

    fix #2256
---
 .../engine/storagegroup/StorageGroupProcessor.java | 38 ++++++++++++++++++++++
 .../db/sync/receiver/transfer/SyncServiceImpl.java |  4 +--
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a05ff59..48da88f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.WriteProcessRejectException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -1835,18 +1836,51 @@ public class StorageGroupProcessor {
           newFilePartitionId)) {
         updateLatestTimeMap(newTsFileResource);
       }
+      resetLastCacheWhenLoadingTsfile(newTsFileResource);
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
           tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
       throw new LoadFileException(e);
+    } catch (IllegalPathException | WriteProcessException e) {
+      logger.error("Failed to reset last cache when loading file {}", newTsFileResource.getTsFilePath(), e);
+      throw new LoadFileException(e);
     } finally {
       tsFileManagement.writeUnlock();
       writeUnlock();
     }
   }
 
+  private void resetLastCacheWhenLoadingTsfile(TsFileResource newTsFileResource)
+      throws IllegalPathException, WriteProcessException {
+    for (Entry<String, Integer> entry : newTsFileResource.getDeviceToIndexMap().entrySet()) {
+      PartialPath device = new PartialPath(entry.getKey());
+      int index = entry.getValue();
+      tryToDeleteLastCacheByDevice(device);
+    }
+  }
+
+  private void tryToDeleteLastCacheByDevice(PartialPath deviceId) throws WriteProcessException {
+    if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
+      return;
+    }
+    try {
+      MNode node = IoTDB.metaManager.getDeviceNode(deviceId);
+
+      for (MNode measurementNode : node.getChildren().values()) {
+        if (measurementNode != null) {
+          TimeValuePair lastPair = ((MeasurementMNode) measurementNode).getCachedLast();
+          if (lastPair != null) {
+            ((MeasurementMNode) measurementNode).resetCache();
+          }
+        }
+      }
+    } catch (MetadataException e) {
+      throw new WriteProcessException(e);
+    }
+  }
+
   /**
    * Load a new tsfile to storage group processor. Tne file may have overlap with other files. <p>
    * that there has no file which is overlapping with the new file.
@@ -1895,6 +1929,7 @@ public class StorageGroupProcessor {
         loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
             newFilePartitionId);
       }
+      resetLastCacheWhenLoadingTsfile(newTsFileResource);
 
       // update latest time map
       updateLatestTimeMap(newTsFileResource);
@@ -1904,6 +1939,9 @@ public class StorageGroupProcessor {
           tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
       throw new LoadFileException(e);
+    } catch (IllegalPathException | WriteProcessException e) {
+      logger.error("Failed to reset last cache when loading file {}", newTsFileResource.getTsFilePath(), e);
+      throw new LoadFileException(e);
     } finally {
       tsFileManagement.writeUnlock();
       writeUnlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index ec71871..4150039 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -81,8 +81,8 @@ public class SyncServiceImpl implements SyncService.Iface {
     String ipAddress = info.address, uuid = info.uuid;
     Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
     if (!info.version.equals(IoTDBConstant.VERSION)) {
-      return getErrorResult(String.format("Version mismatch: the sender <%s>, the receiver <%s>",
-          info.version, IoTDBConstant.VERSION));
+//      return getErrorResult(String.format("Version mismatch: the sender <%s>, the receiver <%s>",
+//          info.version, IoTDBConstant.VERSION));
     }
     if (info.partitionInterval != IoTDBDescriptor.getInstance().getConfig()
         .getPartitionInterval()) {