You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/11/26 03:46:28 UTC

[incubator-iotdb] branch add_load_tsfile_feature updated: add remove tsfile and move tsfile to target directory

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch add_load_tsfile_feature
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/add_load_tsfile_feature by this push:
     new 734c217  add remove tsfile and move tsfile to target directory
734c217 is described below

commit 734c2179bb1d2761870599e09e36483dc21ff9c5
Author: lta <li...@163.com>
AuthorDate: Tue Nov 26 11:46:08 2019 +0800

    add remove tsfile and move tsfile to target directory
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 11 ++-
 .../engine/storagegroup/StorageGroupProcessor.java | 95 +++++++++++++++++-----
 .../db/engine/storagegroup/TsFileResource.java     |  8 ++
 .../iotdb/db/qp/executor/QueryProcessExecutor.java | 65 +++++++++++++--
 .../iotdb/db/sync/receiver/load/FileLoader.java    | 41 ++--------
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 62 ++++++++++++++
 6 files changed, 220 insertions(+), 62 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 392fdc1..1fa1ced 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -337,6 +337,7 @@ public class StorageEngine implements IService {
 
   /**
    * count all Tsfiles which need to be upgraded
+   *
    * @return total num of the tsfiles which need to be upgraded
    */
   public int countUpgradeFiles() {
@@ -433,8 +434,14 @@ public class StorageEngine implements IService {
         .loadNewTsFile(newTsFileResource);
   }
 
-  public void deleteTsfile(File deletedTsfile) throws StorageEngineException {
-    getProcessor(deletedTsfile.getParentFile().getName()).deleteTsfile(deletedTsfile);
+  public boolean deleteTsfile(File deletedTsfile) throws StorageEngineException {
+    return getProcessor(deletedTsfile.getParentFile().getName()).deleteTsfile(deletedTsfile);
+  }
+
+  public boolean moveTsfile(File deletedTsfile, File targetDir)
+      throws StorageEngineException, IOException {
+    return getProcessor(deletedTsfile.getParentFile().getName())
+        .moveTsfile(deletedTsfile, targetDir);
   }
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index fc64ddc..a21e6d9 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -59,8 +59,8 @@ import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.exception.MergeException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
@@ -70,10 +70,9 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.JobFileManager;
-import org.apache.iotdb.db.utils.UpgradeUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -297,7 +296,8 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void recoverUnseqFiles(List<TsFileResource> tsFiles) throws StorageGroupProcessorException {
+  private void recoverUnseqFiles(List<TsFileResource> tsFiles)
+      throws StorageGroupProcessorException {
     for (TsFileResource tsFileResource : tsFiles) {
       unSequenceFileList.add(tsFileResource);
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
@@ -1295,29 +1295,30 @@ public class StorageGroupProcessor {
    *
    * Secondly, delete the tsfile and .resource file.
    *
-   * @param deletedTsfile tsfile to be deleted
-   * @UsedBy sync module.
+   * @param tsfieToBeDeleted tsfile to be deleted
+   * @return whether the file to be deleted exists.
+   * @UsedBy sync module, load external tsfile module.
    */
-  public void deleteTsfile(File deletedTsfile) {
+  public boolean deleteTsfile(File tsfieToBeDeleted) {
     writeLock();
     mergeLock.writeLock().lock();
-    TsFileResource deletedTsFileResource = null;
+    TsFileResource tsFileResourceToBeDeleted = null;
     try {
       Iterator<TsFileResource> sequenceIterator = sequenceFileList.iterator();
       while (sequenceIterator.hasNext()) {
         TsFileResource sequenceResource = sequenceIterator.next();
-        if (sequenceResource.getFile().getName().equals(deletedTsfile.getName())) {
-          deletedTsFileResource = sequenceResource;
+        if (sequenceResource.getFile().getName().equals(tsfieToBeDeleted.getName())) {
+          tsFileResourceToBeDeleted = sequenceResource;
           sequenceIterator.remove();
           break;
         }
       }
-      if (deletedTsFileResource == null) {
+      if (tsFileResourceToBeDeleted == null) {
         Iterator<TsFileResource> unsequenceIterator = unSequenceFileList.iterator();
         while (unsequenceIterator.hasNext()) {
           TsFileResource unsequenceResource = unsequenceIterator.next();
-          if (unsequenceResource.getFile().getName().equals(deletedTsfile.getName())) {
-            deletedTsFileResource = unsequenceResource;
+          if (unsequenceResource.getFile().getName().equals(tsfieToBeDeleted.getName())) {
+            tsFileResourceToBeDeleted = unsequenceResource;
             unsequenceIterator.remove();
             break;
           }
@@ -1327,19 +1328,75 @@ public class StorageGroupProcessor {
       mergeLock.writeLock().unlock();
       writeUnlock();
     }
-    if (deletedTsFileResource == null) {
-      return;
+    if (tsFileResourceToBeDeleted == null) {
+      return false;
     }
-    deletedTsFileResource.getWriteQueryLock().writeLock().lock();
+    tsFileResourceToBeDeleted.getWriteQueryLock().writeLock().lock();
     try {
-      logger.info("Delete tsfile {} in sync loading process.", deletedTsFileResource.getFile());
-      deletedTsFileResource.remove();
+      tsFileResourceToBeDeleted.remove();
+      logger.info("Delete tsfile {} successfully.", tsFileResourceToBeDeleted.getFile());
     } finally {
-      deletedTsFileResource.getWriteQueryLock().writeLock().unlock();
+      tsFileResourceToBeDeleted.getWriteQueryLock().writeLock().unlock();
     }
+    return true;
   }
 
 
+  /**
+   * Move tsfile to the target directory if it exists.
+   *
+   * Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
+   *
+   * Secondly, move the tsfile and .resource file to the target directory.
+   *
+   * @param fileToBeMoved tsfile to be moved
+   * @return whether the file to be moved exists.
+   * @UsedBy load external tsfile module.
+   */
+  public boolean moveTsfile(File fileToBeMoved, File targetDir) throws IOException {
+    writeLock();
+    mergeLock.writeLock().lock();
+    TsFileResource tsFileResourceToBeMoved = null;
+    try {
+      Iterator<TsFileResource> sequenceIterator = sequenceFileList.iterator();
+      while (sequenceIterator.hasNext()) {
+        TsFileResource sequenceResource = sequenceIterator.next();
+        if (sequenceResource.getFile().getName().equals(fileToBeMoved.getName())) {
+          tsFileResourceToBeMoved = sequenceResource;
+          sequenceIterator.remove();
+          break;
+        }
+      }
+      if (tsFileResourceToBeMoved == null) {
+        Iterator<TsFileResource> unsequenceIterator = unSequenceFileList.iterator();
+        while (unsequenceIterator.hasNext()) {
+          TsFileResource unsequenceResource = unsequenceIterator.next();
+          if (unsequenceResource.getFile().getName().equals(fileToBeMoved.getName())) {
+            tsFileResourceToBeMoved = unsequenceResource;
+            unsequenceIterator.remove();
+            break;
+          }
+        }
+      }
+    } finally {
+      mergeLock.writeLock().unlock();
+      writeUnlock();
+    }
+    if (tsFileResourceToBeMoved == null) {
+      return false;
+    }
+    tsFileResourceToBeMoved.getWriteQueryLock().writeLock().lock();
+    try {
+      tsFileResourceToBeMoved.moveTo(targetDir);
+      logger
+          .info("Move tsfile {} to target dir {} successfully.", tsFileResourceToBeMoved.getFile(),
+              targetDir.getPath());
+    } finally {
+      tsFileResourceToBeMoved.getWriteQueryLock().writeLock().unlock();
+    }
+    return true;
+  }
+
   public TsFileProcessor getWorkSequenceTsFileProcessor() {
     return workSequenceTsFileProcessor;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index b1b3f7b..68a967a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
@@ -258,6 +259,13 @@ public class TsFileResource {
     fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX).delete();
   }
 
+  public void moveTo(File targetDir) throws IOException {
+    FileUtils.moveFile(file, new File(targetDir, file.getName()));
+    FileUtils.moveFile(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX),
+        new File(targetDir, file.getName() + RESOURCE_SUFFIX));
+    fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX).delete();
+  }
+
   @Override
   public String toString() {
     return file.toString();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 6fb4956..81e6f5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -22,6 +22,7 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.PRIVILEGE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ROLE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.USER;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -38,8 +39,9 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.path.PathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.storageGroup.StorageGroupException;
@@ -67,6 +69,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.ListDataSet;
 import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.db.utils.AuthUtils;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.cache.CacheException;
@@ -156,16 +159,68 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
     }
   }
 
-  private void operateLoadFiles(OperateFilePlan plan){
-
+  private void operateLoadFiles(OperateFilePlan plan) throws QueryProcessException {
+    File file = plan.getFile();
+    if (!file.exists()) {
+      throw new QueryProcessException(
+          String.format("File path %s doesn't exists.", file.getPath()));
+    }
+    if (file.isDirectory()) {
+      recursionFileDir(file);
+    } else {
+      loadFile(file);
+    }
   }
 
-  private void operateRemoveFile(OperateFilePlan plan){
+  private void recursionFileDir(File curFile) throws QueryProcessException {
+    File[] files = curFile.listFiles();
+    for (File file : files) {
+      if (file.isDirectory()) {
+        recursionFileDir(file);
+      } else {
+        loadFile(file);
+      }
+    }
+  }
 
+  private void loadFile(File file) throws QueryProcessException {
+    TsFileResource tsFileResource = new TsFileResource(file);
+    try {
+      FileLoaderUtils.checkTsFileResource(tsFileResource);
+      //TODO check version and metadata
+    } catch (IOException e) {
+      throw new QueryProcessException(
+          String.format("Cannot load file %s because %s", file.getAbsolutePath(), e.getMessage()));
+    }
   }
 
-  private void operateMoveFile(OperateFilePlan plan){
+  private void operateRemoveFile(OperateFilePlan plan) throws QueryProcessException {
+    try {
+      if (!StorageEngine.getInstance().deleteTsfile(plan.getFile())) {
+        throw new QueryProcessException(
+            String.format("File %s doesn't exists.", plan.getFile().getName()));
+      }
+    } catch (StorageEngineException e) {
+      throw new QueryProcessException(
+          String.format("Cannot remove file because %s", e.getMessage()));
+    }
+  }
 
+  private void operateMoveFile(OperateFilePlan plan) throws QueryProcessException {
+    if (!plan.getTargetDir().exists() || !plan.getTargetDir().isDirectory()) {
+      throw new QueryProcessException(
+          String.format("Target dir %s is invalid.", plan.getTargetDir().getPath()));
+    }
+    try {
+      if (!StorageEngine.getInstance().moveTsfile(plan.getFile(), plan.getTargetDir())) {
+        throw new QueryProcessException(
+            String.format("File %s doesn't exists.", plan.getFile().getName()));
+      }
+    } catch (StorageEngineException | IOException e) {
+      throw new QueryProcessException(
+          String.format("Cannot move file %s to target directory %s because %s",
+              plan.getFile().getPath(), plan.getTargetDir().getPath(), e.getMessage()));
+    }
   }
 
   private void operateTTL(SetTTLPlan plan) throws QueryProcessException {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index b0c9df2..be2cb61 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.sync.receiver.load;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -31,12 +30,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.SyncDeviceOwnerConflictException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.sync.conf.SyncConstant;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
-import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -139,7 +133,7 @@ public class FileLoader implements IFileLoader {
       return;
     }
     TsFileResource tsFileResource = new TsFileResource(newTsFile);
-    checkTsFileResource(tsFileResource);
+    FileLoaderUtils.checkTsFileResource(tsFileResource);
     try {
       FileLoaderManager.getInstance().checkAndUpdateDeviceOwner(tsFileResource);
       StorageEngine.getInstance().loadNewTsFile(tsFileResource);
@@ -152,40 +146,15 @@ public class FileLoader implements IFileLoader {
     loadLog.finishLoadTsfile(newTsFile);
   }
 
-  private void checkTsFileResource(TsFileResource tsFileResource) throws IOException {
-    if (!tsFileResource.fileExists()) {
-      // .resource file does not exist, read file metadata and recover tsfile resource
-      try (TsFileSequenceReader reader = new TsFileSequenceReader(
-          tsFileResource.getFile().getAbsolutePath())) {
-        TsFileMetaData metaData = reader.readFileMetadata();
-        for (TsDeviceMetadataIndex index : metaData.getDeviceMap().values()) {
-          TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
-          List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
-              .getChunkGroupMetaDataList();
-          for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
-            for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
-              tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
-                  chunkMetaData.getStartTime());
-              tsFileResource
-                  .updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
-            }
-          }
-        }
-      }
-      // write .resource file
-      tsFileResource.serialize();
-    } else {
-      tsFileResource.deSerialize();
-    }
-  }
-
   private void loadDeletedFile(File deletedTsFile) throws IOException {
     if (curType != LoadType.DELETE) {
       loadLog.startLoadDeletedFiles();
       curType = LoadType.DELETE;
     }
     try {
-      StorageEngine.getInstance().deleteTsfile(deletedTsFile);
+      if (!StorageEngine.getInstance().deleteTsfile(deletedTsFile)) {
+        LOGGER.info("The file {} to be deleted doesn't exist.", deletedTsFile.getAbsolutePath());
+      }
     } catch (StorageEngineException e) {
       LOGGER.error("Can not load deleted tsfile {}", deletedTsFile.getAbsolutePath(), e);
       throw new IOException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
new file mode 100644
index 0000000..0cdc043
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+
+public class FileLoaderUtils {
+
+  private FileLoaderUtils() {
+  }
+
+  public static void checkTsFileResource(TsFileResource tsFileResource) throws IOException {
+    if (!tsFileResource.fileExists()) {
+      // .resource file does not exist, read file metadata and recover tsfile resource
+      try (TsFileSequenceReader reader = new TsFileSequenceReader(
+          tsFileResource.getFile().getAbsolutePath())) {
+        TsFileMetaData metaData = reader.readFileMetadata();
+        for (TsDeviceMetadataIndex index : metaData.getDeviceMap().values()) {
+          TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
+          List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
+              .getChunkGroupMetaDataList();
+          for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
+            for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
+              tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
+                  chunkMetaData.getStartTime());
+              tsFileResource
+                  .updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
+            }
+          }
+        }
+      }
+      // write .resource file
+      tsFileResource.serialize();
+    } else {
+      tsFileResource.deSerialize();
+    }
+  }
+}