You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/11/26 03:46:28 UTC
[incubator-iotdb] branch add_load_tsfile_feature updated: add
remove tsfile and move tsfile to target directory
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch add_load_tsfile_feature
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/add_load_tsfile_feature by this push:
new 734c217 add remove tsfile and move tsfile to target directory
734c217 is described below
commit 734c2179bb1d2761870599e09e36483dc21ff9c5
Author: lta <li...@163.com>
AuthorDate: Tue Nov 26 11:46:08 2019 +0800
add remove tsfile and move tsfile to target directory
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 11 ++-
.../engine/storagegroup/StorageGroupProcessor.java | 95 +++++++++++++++++-----
.../db/engine/storagegroup/TsFileResource.java | 8 ++
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 65 +++++++++++++--
.../iotdb/db/sync/receiver/load/FileLoader.java | 41 ++--------
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 62 ++++++++++++++
6 files changed, 220 insertions(+), 62 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 392fdc1..1fa1ced 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -337,6 +337,7 @@ public class StorageEngine implements IService {
/**
* count all Tsfiles which need to be upgraded
+ *
* @return total num of the tsfiles which need to be upgraded
*/
public int countUpgradeFiles() {
@@ -433,8 +434,14 @@ public class StorageEngine implements IService {
.loadNewTsFile(newTsFileResource);
}
- public void deleteTsfile(File deletedTsfile) throws StorageEngineException {
- getProcessor(deletedTsfile.getParentFile().getName()).deleteTsfile(deletedTsfile);
+ public boolean deleteTsfile(File deletedTsfile) throws StorageEngineException {
+ return getProcessor(deletedTsfile.getParentFile().getName()).deleteTsfile(deletedTsfile);
+ }
+
+ public boolean moveTsfile(File deletedTsfile, File targetDir)
+ throws StorageEngineException, IOException {
+ return getProcessor(deletedTsfile.getParentFile().getName())
+ .moveTsfile(deletedTsfile, targetDir);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index fc64ddc..a21e6d9 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -59,8 +59,8 @@ import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.MergeException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
@@ -70,10 +70,9 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.JobFileManager;
-import org.apache.iotdb.db.utils.UpgradeUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -297,7 +296,8 @@ public class StorageGroupProcessor {
}
}
- private void recoverUnseqFiles(List<TsFileResource> tsFiles) throws StorageGroupProcessorException {
+ private void recoverUnseqFiles(List<TsFileResource> tsFiles)
+ throws StorageGroupProcessorException {
for (TsFileResource tsFileResource : tsFiles) {
unSequenceFileList.add(tsFileResource);
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
@@ -1295,29 +1295,30 @@ public class StorageGroupProcessor {
*
* Secondly, delete the tsfile and .resource file.
*
- * @param deletedTsfile tsfile to be deleted
- * @UsedBy sync module.
+ * @param tsfieToBeDeleted tsfile to be deleted
+ * @return whether the file to be deleted exists.
+ * @UsedBy sync module, load external tsfile module.
*/
- public void deleteTsfile(File deletedTsfile) {
+ public boolean deleteTsfile(File tsfieToBeDeleted) {
writeLock();
mergeLock.writeLock().lock();
- TsFileResource deletedTsFileResource = null;
+ TsFileResource tsFileResourceToBeDeleted = null;
try {
Iterator<TsFileResource> sequenceIterator = sequenceFileList.iterator();
while (sequenceIterator.hasNext()) {
TsFileResource sequenceResource = sequenceIterator.next();
- if (sequenceResource.getFile().getName().equals(deletedTsfile.getName())) {
- deletedTsFileResource = sequenceResource;
+ if (sequenceResource.getFile().getName().equals(tsfieToBeDeleted.getName())) {
+ tsFileResourceToBeDeleted = sequenceResource;
sequenceIterator.remove();
break;
}
}
- if (deletedTsFileResource == null) {
+ if (tsFileResourceToBeDeleted == null) {
Iterator<TsFileResource> unsequenceIterator = unSequenceFileList.iterator();
while (unsequenceIterator.hasNext()) {
TsFileResource unsequenceResource = unsequenceIterator.next();
- if (unsequenceResource.getFile().getName().equals(deletedTsfile.getName())) {
- deletedTsFileResource = unsequenceResource;
+ if (unsequenceResource.getFile().getName().equals(tsfieToBeDeleted.getName())) {
+ tsFileResourceToBeDeleted = unsequenceResource;
unsequenceIterator.remove();
break;
}
@@ -1327,19 +1328,75 @@ public class StorageGroupProcessor {
mergeLock.writeLock().unlock();
writeUnlock();
}
- if (deletedTsFileResource == null) {
- return;
+ if (tsFileResourceToBeDeleted == null) {
+ return false;
}
- deletedTsFileResource.getWriteQueryLock().writeLock().lock();
+ tsFileResourceToBeDeleted.getWriteQueryLock().writeLock().lock();
try {
- logger.info("Delete tsfile {} in sync loading process.", deletedTsFileResource.getFile());
- deletedTsFileResource.remove();
+ tsFileResourceToBeDeleted.remove();
+ logger.info("Delete tsfile {} successfully.", tsFileResourceToBeDeleted.getFile());
} finally {
- deletedTsFileResource.getWriteQueryLock().writeLock().unlock();
+ tsFileResourceToBeDeleted.getWriteQueryLock().writeLock().unlock();
}
+ return true;
}
+ /**
+ * Move tsfile to the target directory if it exists.
+ *
+ * Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
+ *
+ * Secondly, move the tsfile and .resource file to the target directory.
+ *
+ * @param fileToBeMoved tsfile to be moved
+ * @return whether the file to be moved exists.
+ * @UsedBy load external tsfile module.
+ */
+ public boolean moveTsfile(File fileToBeMoved, File targetDir) throws IOException {
+ writeLock();
+ mergeLock.writeLock().lock();
+ TsFileResource tsFileResourceToBeMoved = null;
+ try {
+ Iterator<TsFileResource> sequenceIterator = sequenceFileList.iterator();
+ while (sequenceIterator.hasNext()) {
+ TsFileResource sequenceResource = sequenceIterator.next();
+ if (sequenceResource.getFile().getName().equals(fileToBeMoved.getName())) {
+ tsFileResourceToBeMoved = sequenceResource;
+ sequenceIterator.remove();
+ break;
+ }
+ }
+ if (tsFileResourceToBeMoved == null) {
+ Iterator<TsFileResource> unsequenceIterator = unSequenceFileList.iterator();
+ while (unsequenceIterator.hasNext()) {
+ TsFileResource unsequenceResource = unsequenceIterator.next();
+ if (unsequenceResource.getFile().getName().equals(fileToBeMoved.getName())) {
+ tsFileResourceToBeMoved = unsequenceResource;
+ unsequenceIterator.remove();
+ break;
+ }
+ }
+ }
+ } finally {
+ mergeLock.writeLock().unlock();
+ writeUnlock();
+ }
+ if (tsFileResourceToBeMoved == null) {
+ return false;
+ }
+ tsFileResourceToBeMoved.getWriteQueryLock().writeLock().lock();
+ try {
+ tsFileResourceToBeMoved.moveTo(targetDir);
+ logger
+ .info("Move tsfile {} to target dir {} successfully.", tsFileResourceToBeMoved.getFile(),
+ targetDir.getPath());
+ } finally {
+ tsFileResourceToBeMoved.getWriteQueryLock().writeLock().unlock();
+ }
+ return true;
+ }
+
public TsFileProcessor getWorkSequenceTsFileProcessor() {
return workSequenceTsFileProcessor;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index b1b3f7b..68a967a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
@@ -258,6 +259,13 @@ public class TsFileResource {
fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX).delete();
}
+ public void moveTo(File targetDir) throws IOException {
+ FileUtils.moveFile(file, new File(targetDir, file.getName()));
+ FileUtils.moveFile(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX),
+ new File(targetDir, file.getName() + RESOURCE_SUFFIX));
+ fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX).delete();
+ }
+
@Override
public String toString() {
return file.toString();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 6fb4956..81e6f5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -22,6 +22,7 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.PRIVILEGE;
import static org.apache.iotdb.db.conf.IoTDBConstant.ROLE;
import static org.apache.iotdb.db.conf.IoTDBConstant.USER;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -38,8 +39,9 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupException;
@@ -67,6 +69,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.db.utils.AuthUtils;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.cache.CacheException;
@@ -156,16 +159,68 @@ public class QueryProcessExecutor extends AbstractQueryProcessExecutor {
}
}
- private void operateLoadFiles(OperateFilePlan plan){
-
+ private void operateLoadFiles(OperateFilePlan plan) throws QueryProcessException {
+ File file = plan.getFile();
+ if (!file.exists()) {
+ throw new QueryProcessException(
+ String.format("File path %s doesn't exists.", file.getPath()));
+ }
+ if (file.isDirectory()) {
+ recursionFileDir(file);
+ } else {
+ loadFile(file);
+ }
}
- private void operateRemoveFile(OperateFilePlan plan){
+ private void recursionFileDir(File curFile) throws QueryProcessException {
+ File[] files = curFile.listFiles();
+ for (File file : files) {
+ if (file.isDirectory()) {
+ recursionFileDir(file);
+ } else {
+ loadFile(file);
+ }
+ }
+ }
+ private void loadFile(File file) throws QueryProcessException {
+ TsFileResource tsFileResource = new TsFileResource(file);
+ try {
+ FileLoaderUtils.checkTsFileResource(tsFileResource);
+ //TODO check version and metadata
+ } catch (IOException e) {
+ throw new QueryProcessException(
+ String.format("Cannot load file %s because %s", file.getAbsolutePath(), e.getMessage()));
+ }
}
- private void operateMoveFile(OperateFilePlan plan){
+ private void operateRemoveFile(OperateFilePlan plan) throws QueryProcessException {
+ try {
+ if (!StorageEngine.getInstance().deleteTsfile(plan.getFile())) {
+ throw new QueryProcessException(
+ String.format("File %s doesn't exists.", plan.getFile().getName()));
+ }
+ } catch (StorageEngineException e) {
+ throw new QueryProcessException(
+ String.format("Cannot remove file because %s", e.getMessage()));
+ }
+ }
+ private void operateMoveFile(OperateFilePlan plan) throws QueryProcessException {
+ if (!plan.getTargetDir().exists() || !plan.getTargetDir().isDirectory()) {
+ throw new QueryProcessException(
+ String.format("Target dir %s is invalid.", plan.getTargetDir().getPath()));
+ }
+ try {
+ if (!StorageEngine.getInstance().moveTsfile(plan.getFile(), plan.getTargetDir())) {
+ throw new QueryProcessException(
+ String.format("File %s doesn't exists.", plan.getFile().getName()));
+ }
+ } catch (StorageEngineException | IOException e) {
+ throw new QueryProcessException(
+ String.format("Cannot move file %s to target directory %s because %s",
+ plan.getFile().getPath(), plan.getTargetDir().getPath(), e.getMessage()));
+ }
}
private void operateTTL(SetTTLPlan plan) throws QueryProcessException {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index b0c9df2..be2cb61 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.sync.receiver.load;
import java.io.File;
import java.io.IOException;
-import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -31,12 +30,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.SyncDeviceOwnerConflictException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.sync.conf.SyncConstant;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
-import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,7 +133,7 @@ public class FileLoader implements IFileLoader {
return;
}
TsFileResource tsFileResource = new TsFileResource(newTsFile);
- checkTsFileResource(tsFileResource);
+ FileLoaderUtils.checkTsFileResource(tsFileResource);
try {
FileLoaderManager.getInstance().checkAndUpdateDeviceOwner(tsFileResource);
StorageEngine.getInstance().loadNewTsFile(tsFileResource);
@@ -152,40 +146,15 @@ public class FileLoader implements IFileLoader {
loadLog.finishLoadTsfile(newTsFile);
}
- private void checkTsFileResource(TsFileResource tsFileResource) throws IOException {
- if (!tsFileResource.fileExists()) {
- // .resource file does not exist, read file metadata and recover tsfile resource
- try (TsFileSequenceReader reader = new TsFileSequenceReader(
- tsFileResource.getFile().getAbsolutePath())) {
- TsFileMetaData metaData = reader.readFileMetadata();
- for (TsDeviceMetadataIndex index : metaData.getDeviceMap().values()) {
- TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
- List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
- .getChunkGroupMetaDataList();
- for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
- for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
- tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
- chunkMetaData.getStartTime());
- tsFileResource
- .updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
- }
- }
- }
- }
- // write .resource file
- tsFileResource.serialize();
- } else {
- tsFileResource.deSerialize();
- }
- }
-
private void loadDeletedFile(File deletedTsFile) throws IOException {
if (curType != LoadType.DELETE) {
loadLog.startLoadDeletedFiles();
curType = LoadType.DELETE;
}
try {
- StorageEngine.getInstance().deleteTsfile(deletedTsFile);
+ if (!StorageEngine.getInstance().deleteTsfile(deletedTsFile)) {
+ LOGGER.info("The file {} to be deleted doesn't exist.", deletedTsFile.getAbsolutePath());
+ }
} catch (StorageEngineException e) {
LOGGER.error("Can not load deleted tsfile {}", deletedTsFile.getAbsolutePath(), e);
throw new IOException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
new file mode 100644
index 0000000..0cdc043
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+
+public class FileLoaderUtils {
+
+ private FileLoaderUtils() {
+ }
+
+ public static void checkTsFileResource(TsFileResource tsFileResource) throws IOException {
+ if (!tsFileResource.fileExists()) {
+ // .resource file does not exist, read file metadata and recover tsfile resource
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(
+ tsFileResource.getFile().getAbsolutePath())) {
+ TsFileMetaData metaData = reader.readFileMetadata();
+ for (TsDeviceMetadataIndex index : metaData.getDeviceMap().values()) {
+ TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
+ List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
+ .getChunkGroupMetaDataList();
+ for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
+ for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
+ tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
+ chunkMetaData.getStartTime());
+ tsFileResource
+ .updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
+ }
+ }
+ }
+ }
+ // write .resource file
+ tsFileResource.serialize();
+ } else {
+ tsFileResource.deSerialize();
+ }
+ }
+}