You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/09/02 09:50:12 UTC
[incubator-iotdb] 02/03: complete load tsfiles module
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 1785cab5b4244c5251a9b43113b064e88913c5dd
Author: lta <li...@163.com>
AuthorDate: Mon Sep 2 17:22:39 2019 +0800
complete load tsfiles module
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 11 +
.../iotdb/db/engine/merge/task/MergeFileTask.java | 23 +-
.../engine/storagegroup/StorageGroupProcessor.java | 247 +++++++++++++++++++--
.../db/engine/storagegroup/TsFileResource.java | 4 +
.../iotdb/db/sync/receiver/load/FileLoader.java | 43 +++-
.../sender/recover/ISyncSenderLogAnalyzer.java | 3 +-
.../sync/sender/recover/SyncSenderLogAnalyzer.java | 7 +-
.../sync/sender/transfer/DataTransferManager.java | 2 +-
.../sender/recover/SyncSenderLogAnalyzerTest.java | 25 +++
9 files changed, 338 insertions(+), 27 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 3e36516..c96ef2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StorageEngineFailureException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -337,4 +338,14 @@ public class StorageEngine implements IService {
return true;
}
+
+ public void loadNewTsFile(File newTsFile, TsFileResource resource)
+ throws TsFileProcessorException {
+ processorMap.get(newTsFile.getParentFile().getName()).loadNewTsFile(newTsFile, resource);
+ }
+
+ public void deleteTsfile(File deletedTsfile){
+ processorMap.get(deletedTsfile.getParentFile().getName()).deleteTsfile(deletedTsfile);
+ }
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index cf1f62d..68d3141 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.engine.merge.task;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -148,6 +151,12 @@ class MergeFileTask {
logger.debug("{} moved merged chunks of {} to the old file", taskName, seqFile);
newFileWriter.getFile().delete();
+
+ File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
+ FileUtils.moveFile(seqFile.getFile(), nextMergeVersionFile);
+ FileUtils.moveFile(new File(seqFile.getFile(), TsFileResource.RESOURCE_SUFFIX),
+ new File(nextMergeVersionFile, TsFileResource.RESOURCE_SUFFIX));
+ seqFile.setFile(nextMergeVersionFile);
} finally {
seqFile.getMergeQueryLock().writeLock().unlock();
}
@@ -209,12 +218,24 @@ class MergeFileTask {
TsFileMetaDataCache.getInstance().remove(seqFile);
DeviceMetaDataCache.getInstance().remove(seqFile);
seqFile.getFile().delete();
- FileUtils.moveFile(fileWriter.getFile(), seqFile.getFile());
+
+ File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
+ FileUtils.moveFile(fileWriter.getFile(), nextMergeVersionFile);
+ FileUtils.moveFile(new File(seqFile.getFile(), TsFileResource.RESOURCE_SUFFIX),
+ new File(nextMergeVersionFile, TsFileResource.RESOURCE_SUFFIX));
+ seqFile.setFile(nextMergeVersionFile);
} finally {
seqFile.getMergeQueryLock().writeLock().unlock();
}
}
+ private File getNextMergeVersionFile(File seqFile) {
+ String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "").split("-");
+ int mergeVersion = Integer.parseInt(splits[2]) + 1;
+ return new File(seqFile.getParentFile(),
+ splits[0] + "-" + splits[1] + "-" + mergeVersion + TSFILE_SUFFIX);
+ }
+
private long writeUnmergedChunks(List<Long> chunkStartTimes,
List<ChunkMetaData> chunkMetaDataList, TsFileSequenceReader reader,
RestorableTsFileIOWriter fileWriter) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1e41c57..4da8e30 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -38,6 +38,7 @@ import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
@@ -67,17 +68,17 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.JobFileManager;
-import org.apache.iotdb.rpc.TSStatusType;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
+import org.apache.iotdb.rpc.TSStatusType;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -206,7 +207,8 @@ public class StorageGroupProcessor {
try {
// collect TsFiles from sequential and unsequential data directory
- List<TsFileResource> seqTsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders());
+ List<TsFileResource> seqTsFiles = getAllFiles(
+ DirectoryManager.getInstance().getAllSequenceFileFolders());
List<TsFileResource> unseqTsFiles =
getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
@@ -222,7 +224,8 @@ public class StorageGroupProcessor {
storageGroupSysDir.getPath(), this::mergeEndAction, taskName,
IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
- recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
+ recoverMergeTask
+ .recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
mergingMods.delete();
}
@@ -297,7 +300,7 @@ public class StorageGroupProcessor {
}
// TsFileNameComparator compares TsFiles by the version number in its name
- // ({systemTime}-{versionNum}.tsfile)
+ // ({systemTime}-{versionNum}-{mergeNum}.tsfile)
public int compareFileName(File o1, File o2) {
String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split("-");
String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split("-");
@@ -397,7 +400,8 @@ public class StorageGroupProcessor {
boolean result = tsFileProcessor.insertBatch(batchInsertPlan, indexes, results);
// try to update the latest time of the device of this tsRecord
- if (result && latestTimeForEachDevice.get(batchInsertPlan.getDeviceId()) < batchInsertPlan.getMaxTime()) {
+ if (result && latestTimeForEachDevice.get(batchInsertPlan.getDeviceId()) < batchInsertPlan
+ .getMaxTime()) {
latestTimeForEachDevice.put(batchInsertPlan.getDeviceId(), batchInsertPlan.getMaxTime());
}
@@ -472,8 +476,9 @@ public class StorageGroupProcessor {
e);
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
} catch (IOException e) {
- logger.error("meet IOException when creating TsFileProcessor, change system mode to read-only",
- e);
+ logger
+ .error("meet IOException when creating TsFileProcessor, change system mode to read-only",
+ e);
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
}
return tsFileProcessor;
@@ -490,7 +495,7 @@ public class StorageGroupProcessor {
new File(baseDir, storageGroupName).mkdirs();
String filePath = Paths.get(baseDir, storageGroupName,
- System.currentTimeMillis() + "-" + versionController.nextVersion()).toString()
+ System.currentTimeMillis() + "-" + versionController.nextVersion()).toString() + "-0"
+ TSFILE_SUFFIX;
if (sequence) {
@@ -552,7 +557,7 @@ public class StorageGroupProcessor {
this.latestFlushedTimeForEachDevice.clear();
this.latestTimeForEachDevice.clear();
} catch (IOException e) {
- logger.error("Cannot delete files in storage group {}, because", storageGroupName, e);
+ logger.error("Cannot delete files in storage group {}", storageGroupName, e);
} finally {
writeUnlock();
}
@@ -608,7 +613,8 @@ public class StorageGroupProcessor {
deviceId, measurementId, context);
List<TsFileResource> unseqResources = getFileReSourceListForQuery(unSequenceFileList,
deviceId, measurementId, context);
- QueryDataSource dataSource = new QueryDataSource(new Path(deviceId, measurementId), seqResources, unseqResources);
+ QueryDataSource dataSource = new QueryDataSource(new Path(deviceId, measurementId),
+ seqResources, unseqResources);
// used files should be added before mergeLock is unlocked, or they may be deleted by
// running merge
// is null only in tests
@@ -855,8 +861,10 @@ public class StorageGroupProcessor {
mergeResource.setCacheDeviceMeta(true);
MergeTask mergeTask = new MergeTask(mergeResource, storageGroupSysDir.getPath(),
- this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(), storageGroupName);
- mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME);
+ this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(),
+ storageGroupName);
+ mergingModification = new ModificationFile(
+ storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME);
MergeManager.getINSTANCE().submitMainTask(mergeTask);
if (logger.isInfoEnabled()) {
logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
@@ -950,6 +958,219 @@ public class StorageGroupProcessor {
logger.info("{} a merge task ends", storageGroupName);
}
+ /**
+ * Load a new tsfile to storage group processor
+ *
+ * Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
+ * or unsequence list.
+ *
+ * Secondly, execute the loading process by the type.
+ *
+ * Finally, update the latestTimeForEachDevice and latestFlushedTimeForEachDevice.
+ *
+ * @param newTsFile new tsfile
+ * @param newTsFileResource tsfile resource
+ * @UsedBy sync module.
+ */
+ public void loadNewTsFile(File newTsFile, TsFileResource newTsFileResource)
+ throws TsFileProcessorException {
+ writeLock();
+ mergeLock.writeLock().lock();
+ try {
+ boolean isOverlap = false;
+ int preIndex = -1, subsequentIndex = sequenceFileList.size();
+ // check new tsfile
+ outer:
+ for (int i = 0; i < sequenceFileList.size(); i++) {
+ if (i == sequenceFileList.size() - 1 && sequenceFileList.get(i).getEndTimeMap().isEmpty()) {
+ continue;
+ }
+ int preCnt = 0, subsequenceCnt = 0;
+ for (String device : newTsFileResource.getStartTimeMap().keySet()) {
+ if (sequenceFileList.get(i).getStartTimeMap().containsKey(device)) {
+ long startTime1 = sequenceFileList.get(i).getStartTimeMap().get(device);
+ long endTime1 = sequenceFileList.get(i).getEndTimeMap().get(device);
+ long startTime2 = newTsFileResource.getStartTimeMap().get(device);
+ long endTime2 = newTsFileResource.getEndTimeMap().get(device);
+ if (startTime1 > endTime2) {
+ subsequenceCnt++;
+ } else if (startTime2 > endTime1) {
+ preCnt++;
+ } else {
+ isOverlap = true;
+ break outer;
+ }
+ }
+ }
+ if (preCnt != 0 && subsequenceCnt != 0) {
+ isOverlap = true;
+ break;
+ }
+ if (preCnt == 0 && subsequenceCnt != 0) {
+ subsequentIndex = i;
+ break;
+ }
+ if (preCnt != 0 && subsequenceCnt == 0) {
+ preIndex = i;
+ }
+ }
+
+ // loading tsfile by type
+ if (isOverlap) {
+ loadTsFileByType(-1, newTsFile, newTsFileResource, unSequenceFileList.size());
+ } else {
+ if (subsequentIndex != sequenceFileList.size()) {
+ loadTsFileByType(1, newTsFile, newTsFileResource, subsequentIndex);
+ } else {
+ if (preIndex != -1) {
+ loadTsFileByType(1, newTsFile, newTsFileResource, preIndex + 1);
+ } else {
+ loadTsFileByType(1, newTsFile, newTsFileResource, sequenceFileList.size());
+ }
+ }
+ }
+
+ // update latest time map
+ updateLatestTimeMap(newTsFileResource);
+ } catch (TsFileProcessorException e) {
+ logger.error("Failed to append the tsfile {} to storage group processor {}.",
+ newTsFile.getAbsolutePath(), newTsFile.getParentFile().getName());
+ throw new TsFileProcessorException(e);
+ } finally {
+ mergeLock.writeLock().unlock();
+ writeUnlock();
+ }
+ }
+
+ /**
+ * Update latest time in latestTimeForEachDevice and latestFlushedTimeForEachDevice.
+ *
+ * @UsedBy sync module
+ */
+ private void updateLatestTimeMap(TsFileResource newTsFileResource) {
+ for (Entry<String, Long> entry : newTsFileResource.getEndTimeMap().entrySet()) {
+ String device = entry.getKey();
+ long endTime = newTsFileResource.getEndTimeMap().get(device);
+ if(!latestTimeForEachDevice.containsKey(device) || latestTimeForEachDevice.get(device) < endTime){
+ latestTimeForEachDevice.put(device, endTime);
+ }
+ if(!latestFlushedTimeForEachDevice.containsKey(device) || latestFlushedTimeForEachDevice.get(device) < endTime){
+ latestFlushedTimeForEachDevice.put(device, endTime);
+ }
+ }
+ }
+
+ /**
+ * Execute the loading process by the type.
+ *
+ * @param type load type: 1 sequence tsfile ; 2 unsequence tsfile
+ * @param tsFile tsfile to be loaded
+ * @param tsFileResource tsfile resource to be loaded
+ * @param index the index in sequenceFileList/unSequenceFileList
+ * @UsedBy sync module
+ */
+ private void loadTsFileByType(int type, File tsFile, TsFileResource tsFileResource, int index)
+ throws TsFileProcessorException {
+ File targetFile;
+ if (type == -1) {
+ targetFile =
+ new File(tsFile.getParentFile().getParentFile().getParentFile().getParentFile()
+ .getParentFile(), IoTDBConstant.UNSEQUENCE_FLODER_NAME
+ + File.separatorChar + tsFile.getParentFile().getName() + File.separatorChar
+ + tsFile.getName());
+ tsFileResource.setFile(targetFile);
+ unSequenceFileList.add(index, tsFileResource);
+ } else {
+ targetFile =
+ new File(tsFile.getParentFile().getParentFile().getParentFile().getParentFile()
+ .getParentFile(), IoTDBConstant.SEQUENCE_FLODER_NAME
+ + File.separatorChar + tsFile.getParentFile().getName() + File.separatorChar
+ + tsFile.getName());
+ tsFileResource.setFile(targetFile);
+ sequenceFileList.add(index, tsFileResource);
+ }
+
+ // move file from sync dir to data dir
+ if (!targetFile.getParentFile().exists()) {
+ targetFile.getParentFile().mkdirs();
+ }
+ if (!new File(tsFile, TsFileResource.RESOURCE_SUFFIX).exists() && !new File(
+ targetFile, TsFileResource.RESOURCE_SUFFIX).exists()) {
+ throw new TsFileProcessorException(
+ String
+ .format("The new .resource file {%s} to be loaded does not exist.",
+ tsFile.getAbsolutePath()));
+ }
+ if (!new File(targetFile, TsFileResource.RESOURCE_SUFFIX).exists() && !new File(
+ tsFile, TsFileResource.RESOURCE_SUFFIX)
+ .renameTo(new File(targetFile, TsFileResource.RESOURCE_SUFFIX))) {
+ throw new TsFileProcessorException(String.format(
+ "File renaming failed when loading .resource file. Origin: %s, Target: %s",
+ new File(tsFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath(),
+ new File(targetFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath()));
+ }
+ if (!tsFile.exists() && !targetFile.exists()) {
+ throw new TsFileProcessorException(String
+ .format("The new tsfile {%s} to be loaded does not exist.",
+ tsFile.getAbsolutePath()));
+ }
+ if (!targetFile.exists() && !tsFile.renameTo(targetFile)) {
+ throw new TsFileProcessorException(String.format(
+ "File renaming failed when loading tsfile. Origin: %s, Target: %s",
+ tsFile.getAbsolutePath(), targetFile.getAbsolutePath()));
+ }
+ }
+
+ /**
+ * Delete tsfile if it exists which.
+ *
+ * Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
+ *
+ * Secondly, delete the tsfile and .resource file.
+ *
+ * @param deletedTsfile tsfile to be deleted
+ * @UsedBy sync module.
+ */
+ public void deleteTsfile(File deletedTsfile) {
+ writeLock();
+ mergeLock.writeLock().lock();
+ TsFileResource deletedTsFileResource = null;
+ try {
+ Iterator<TsFileResource> sequenceIterator = sequenceFileList.iterator();
+ while (sequenceIterator.hasNext()) {
+ TsFileResource sequenceResource = sequenceIterator.next();
+ if (sequenceResource.getFile().getName().equals(deletedTsfile.getName())) {
+ deletedTsFileResource = sequenceResource;
+ sequenceIterator.remove();
+ break;
+ }
+ }
+ if(deletedTsFileResource == null) {
+ Iterator<TsFileResource> unsequenceIterator = unSequenceFileList.iterator();
+ while (unsequenceIterator.hasNext()) {
+ TsFileResource unsequenceResource = unsequenceIterator.next();
+ if (unsequenceResource.getFile().getName().equals(deletedTsfile.getName())) {
+ deletedTsFileResource = unsequenceResource;
+ unsequenceIterator.remove();
+ break;
+ }
+ }
+ }
+ } finally {
+ mergeLock.writeLock().unlock();
+ writeUnlock();
+ }
+ if(deletedTsFileResource == null){
+ return;
+ }
+ deletedTsFileResource.getMergeQueryLock().writeLock().lock();
+ try {
+ deletedTsFileResource.getFile().delete();
+ new File(deletedTsFileResource.getFile(), TsFileResource.RESOURCE_SUFFIX).delete();
+ } finally {
+ deletedTsFileResource.getMergeQueryLock().writeLock().unlock();
+ }
+ }
public TsFileProcessor getWorkSequenceTsFileProcessor() {
return workSequenceTsFileProcessor;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 3a7af7e..1aa2136 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -189,6 +189,10 @@ public class TsFileResource {
return modFile;
}
+ public void setFile(File file) {
+ this.file = file;
+ }
+
public boolean containsDevice(String deviceId) {
return startTimeMap.containsKey(deviceId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index 19f7909..6bd7d49 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.sync.receiver.load;
import java.io.File;
import java.io.IOException;
import java.util.ArrayDeque;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.sync.sender.conf.Constans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,10 +79,15 @@ public class FileLoader implements IFileLoader {
}
}
if (!queue.isEmpty()) {
- handleLoadTask(queue.poll());
+ LoadTask task = queue.poll();
+ try {
+ handleLoadTask(task);
+ }catch (IOException e){
+ LOGGER.error("Can not load task {}", task, e);
+ }
}
}
- } catch (InterruptedException | IOException e) {
+ } catch (InterruptedException e) {
LOGGER.error("Can not handle load task", e);
}
};
@@ -111,32 +119,39 @@ public class FileLoader implements IFileLoader {
public void handleLoadTask(LoadTask task) throws IOException {
switch (task.type) {
case ADD:
- loadDeletedFile(task.file);
+ loadNewTsfile(task.file);
break;
case DELETE:
- loadNewTsfile(task.file);
+ loadDeletedFile(task.file);
break;
default:
LOGGER.error("Wrong load task type {}", task.type);
}
}
- private void loadDeletedFile(File file) throws IOException {
+ private void loadNewTsfile(File newTsFile) throws IOException {
if (curType != LoadType.DELETE) {
loadLog.startLoadDeletedFiles();
curType = LoadType.DELETE;
}
- // TODO load deleted file
- loadLog.finishLoadDeletedFile(file);
+ TsFileResource tsFileResource = new TsFileResource(
+ new File(newTsFile, TsFileResource.RESOURCE_SUFFIX));
+ tsFileResource.deSerialize();
+ try {
+ StorageEngine.getInstance().loadNewTsFile(newTsFile, tsFileResource);
+ } catch (TsFileProcessorException e) {
+ LOGGER.error("Can not load new tsfile {}", newTsFile.getAbsolutePath(), e);
+ }
+ loadLog.finishLoadDeletedFile(newTsFile);
}
- private void loadNewTsfile(File file) throws IOException {
+ private void loadDeletedFile(File deletedTsFile) throws IOException {
if (curType != LoadType.ADD) {
loadLog.startLoadTsFiles();
curType = LoadType.ADD;
}
- // TODO load new tsfile
- loadLog.finishLoadTsfile(file);
+ StorageEngine.getInstance().deleteTsfile(deletedTsFile);
+ loadLog.finishLoadTsfile(deletedTsFile);
}
@@ -165,5 +180,13 @@ public class FileLoader implements IFileLoader {
this.file = file;
this.type = type;
}
+
+ @Override
+ public String toString() {
+ return "LoadTask{" +
+ "file=" + file +
+ ", type=" + type +
+ '}';
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
index d0c09b1..3d7a356 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.sync.sender.recover;
+import java.io.IOException;
import java.util.Set;
/**
@@ -27,7 +28,7 @@ import java.util.Set;
*/
public interface ISyncSenderLogAnalyzer {
- void recover();
+ void recover() throws IOException;
void loadLastLocalFiles(Set<String> lastLocalFiles);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
index af4f00c..6eaec50 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
@@ -26,6 +26,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.sync.sender.conf.Constans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,18 +34,20 @@ import org.slf4j.LoggerFactory;
public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderLogAnalyzer.class);
+ private String senderPath;
private File currentLocalFile;
private File lastLocalFile;
private File syncLogFile;
public SyncSenderLogAnalyzer(String senderPath) {
+ this.senderPath = senderPath;
this.currentLocalFile = new File(senderPath, Constans.CURRENT_LOCAL_FILE_NAME);
this.lastLocalFile = new File(senderPath, Constans.LAST_LOCAL_FILE_NAME);
this.syncLogFile = new File(senderPath, Constans.SYNC_LOG_NAME);
}
@Override
- public void recover() {
+ public void recover() throws IOException {
if (currentLocalFile.exists() && !lastLocalFile.exists()) {
currentLocalFile.renameTo(lastLocalFile);
} else {
@@ -57,6 +60,8 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
lastLocalFiles.addAll(newFiles);
clearLogger(lastLocalFiles);
}
+ FileUtils.deleteDirectory(new File(senderPath, Constans.DATA_SNAPSHOT_NAME));
+ syncLogFile.delete();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index 4969e19..6331d2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -257,7 +257,7 @@ public class DataTransferManager implements IDataTransferManager {
}
}
- private void checkRecovery() {
+ private void checkRecovery() throws IOException {
new SyncSenderLogAnalyzer(config.getSenderFolderPath()).recover();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
index 6b65776..c30eb8f 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
@@ -100,6 +100,31 @@ public class SyncSenderLogAnalyzerTest {
assert lastFilesMap.get(entry.getKey()).size() == entry.getValue().size();
assert lastFilesMap.get(entry.getKey()).containsAll(entry.getValue());
}
+
+ // delete some files
+ assert !new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME).exists();
+ senderLogger = new SyncSenderLogger(
+ new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME));
+ manager.getValidFiles(dataDir);
+ assert !isEmpty(manager.getLastLocalFilesMap());
+ senderLogger.startSyncDeletedFilesName();
+ for(Set<File> newTsFiles:allFileList.values()){
+ for(File file: newTsFiles){
+ senderLogger.finishSyncDeletedFileName(file);
+ }
+ }
+ senderLogger.close();
+ // recover log
+ senderLogAnalyzer.recover();
+ manager.getValidFiles(dataDir);
+ assert isEmpty(manager.getLastLocalFilesMap());
+ assert isEmpty(manager.getDeletedFilesMap());
+ Map<String, Set<File>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap();
+ for (Entry<String, Set<File>> entry : allFileList.entrySet()) {
+ assert toBeSyncedFilesMap.containsKey(entry.getKey());
+ assert toBeSyncedFilesMap.get(entry.getKey()).size() == entry.getValue().size();
+ assert toBeSyncedFilesMap.get(entry.getKey()).containsAll(entry.getValue());
+ }
}
private boolean isEmpty(Map<String, Set<File>> sendingFileList) {