You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/08/22 03:14:15 UTC
[incubator-iotdb] 04/05: complete sync sender module
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit b15ec6bf1bc5dcd9779cfc6e1de5b9afbca69691
Author: lta <li...@163.com>
AuthorDate: Thu Aug 22 11:11:56 2019 +0800
complete sync sender module
---
.../org/apache/iotdb/db/sync/package-info.java | 19 +
.../db/sync/receiver/transfer/SyncServiceImpl.java | 302 ----------------
.../apache/iotdb/db/sync/sender/conf/Constans.java | 15 +-
.../db/sync/sender/manage/ISyncFileManager.java | 24 ++
.../db/sync/sender/manage/SyncFileManager.java | 47 ++-
.../sender/recover/ISyncSenderLogAnalyzer.java | 9 +-
.../db/sync/sender/recover/ISyncSenderLogger.java | 26 ++
.../sync/sender/recover/SyncSenderLogAnalyzer.java | 3 +-
.../sync/sender/transfer/DataTransferManager.java | 397 ++++++++++++---------
.../sync/sender/transfer/IDataTransferManager.java | 42 ++-
10 files changed, 377 insertions(+), 507 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/package-info.java b/server/src/main/java/org/apache/iotdb/db/sync/package-info.java
index 97a4ec5..f3110e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/package-info.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/package-info.java
@@ -17,4 +17,23 @@
* under the License.
*/
+/**
+ * <p>
+ * Package Sync is a suite tool that periodically uploads persistent tsfiles from the sender disk to
+ * the receiver and loads them. With merge module, synchronous update of write, update and delete
+ * operations can be synced.
+ *
+ * On the sender side of the sync, the sync module is a separate process, independent of the IoTDB
+ * process. It can be started and closed through a separate script.
+ *
+ * On the receiver side of the sync, the sync module is embedded in the engine of IoTDB and is in
+ * the same process with IoTDB. The receiver module listens for a separate port. Before using it, it
+ * needs to set up a whitelist at the sync receiver, which is expressed as a network segment. The
+ * receiver only accepts the data transferred from the sender located in the whitelist segment.
+ *
+ * Due to the IoTDB system supports multiple directories of data files, it will perform sub-tasks
+ * according to disks in every complete synchronization task, because hard links are needed in the
+ * execution process. Hard links can not be operated across disk partitions, and a synchronization
+ * task will be performed in turn according to disks.
+ */
package org.apache.iotdb.db.sync;
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 02bb5b6..61ff2ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -391,305 +391,6 @@ public class SyncServiceImpl implements SyncService.Iface {
}
/**
- * Get all tsfiles' info which are sent from sender, it is preparing for merging these data
- */
- public void getFileNodeInfo() throws IOException {
- File dataFileRoot = new File(syncDataPath);
- File[] files = dataFileRoot.listFiles();
- int processedNum = 0;
- for (File storageGroupPB : files) {
- List<String> filesPath = new ArrayList<>();
- File[] filesSG = storageGroupPB.listFiles();
- for (File fileTF : filesSG) { // fileTF means TsFiles
- Map<String, Long> startTimeMap = new HashMap<>();
- Map<String, Long> endTimeMap = new HashMap<>();
- TsFileSequenceReader reader = null;
- try {
- reader = new TsFileSequenceReader(fileTF.getPath());
- Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
- Iterator<String> it = deviceIdMap.keySet().iterator();
- while (it.hasNext()) {
- String key = it.next();
- TsDeviceMetadataIndex device = deviceIdMap.get(key);
- startTimeMap.put(key, device.getStartTime());
- endTimeMap.put(key, device.getEndTime());
- }
- } catch (IOException e) {
- logger.error("Unable to read tsfile {}", fileTF.getPath());
- throw new IOException(e);
- } finally {
- try {
- if (reader != null) {
- reader.close();
- }
- } catch (IOException e) {
- logger.error("Cannot close tsfile stream {}", fileTF.getPath());
- throw new IOException(e);
- }
- }
- fileNodeStartTime.get().put(fileTF.getPath(), startTimeMap);
- fileNodeEndTime.get().put(fileTF.getPath(), endTimeMap);
- filesPath.add(fileTF.getPath());
- processedNum++;
- logger.info(String
- .format("Get tsfile info has complete : %d/%d", processedNum, fileNum.get()));
- fileNodeMap.get().put(storageGroupPB.getName(), filesPath);
- }
- }
- }
-
-
- /**
- * It is to merge data. If data in the tsfile is new, append the tsfile to the storage group
- * directly. If data in the tsfile is old, it has two strategy to merge.It depends on the
- * possibility of updating historical data.
- */
- public void loadData() throws StorageEngineException {
- syncDataPath = FilePathUtils.regularizePath(syncDataPath);
- int processedNum = 0;
- for (String storageGroup : fileNodeMap.get().keySet()) {
- List<String> filesPath = fileNodeMap.get().get(storageGroup);
- /** before load external tsFile, it is necessary to order files in the same storage group **/
- Collections.sort(filesPath, (o1, o2) -> {
- Map<String, Long> startTimePath1 = fileNodeStartTime.get().get(o1);
- Map<String, Long> endTimePath2 = fileNodeEndTime.get().get(o2);
- for (Entry<String, Long> entry : endTimePath2.entrySet()) {
- if (startTimePath1.containsKey(entry.getKey())) {
- if (startTimePath1.get(entry.getKey()) > entry.getValue()) {
- return 1;
- } else {
- return -1;
- }
- }
- }
- return 0;
- });
-
- for (String path : filesPath) {
- // get startTimeMap and endTimeMap
- Map<String, Long> startTimeMap = fileNodeStartTime.get().get(path);
- Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path);
-
- // create a new fileNode
- String header = syncDataPath;
- String relativePath = path.substring(header.length());
- TsFileResource fileNode = new TsFileResource(
- new File(DirectoryManager.getInstance().getNextFolderIndexForSequenceFile() +
- File.separator + relativePath), startTimeMap, endTimeMap
- );
- // call interface of load external file
- try {
- if (!STORAGE_GROUP_MANAGER.appendFileToStorageGroupProcessor(storageGroup, fileNode, path)) {
- // it is a file with unsequence data
- if (config.isUpdateHistoricalDataPossibility()) {
- loadOldData(path);
- } else {
- List<String> overlapFiles = STORAGE_GROUP_MANAGER.getOverlapFiles(
- storageGroup,
- fileNode, uuid.get());
- if (overlapFiles.isEmpty()) {
- loadOldData(path);
- } else {
- loadOldData(path, overlapFiles);
- }
- }
- }
- } catch (StorageEngineException | IOException | ProcessorException e) {
- logger.error("Can not load external file {}", path);
- throw new StorageEngineException(e);
- }
- processedNum++;
- logger.info(String
- .format("Merging files has completed : %d/%d", processedNum, fileNum.get()));
- }
- }
- }
-
- /**
- * Insert all data in the tsfile into IoTDB.
- */
- public void loadOldData(String filePath) throws IOException, ProcessorException {
- Set<String> timeseriesSet = new HashSet<>();
- TsFileSequenceReader reader = null;
- QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
- try {
- /** use tsfile reader to get data **/
- reader = new TsFileSequenceReader(filePath);
- Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
- Iterator<Entry<String, TsDeviceMetadataIndex>> entryIterator = deviceIdMap.entrySet()
- .iterator();
- while (entryIterator.hasNext()) {
- Entry<String, TsDeviceMetadataIndex> deviceMIEntry = entryIterator.next();
- String deviceId = deviceMIEntry.getKey();
- TsDeviceMetadataIndex deviceMI = deviceMIEntry.getValue();
- TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
- List<ChunkGroupMetaData> rowGroupMetadataList = deviceMetadata.getChunkGroupMetaDataList();
- timeseriesSet.clear();
- /** firstly, get all timeseries in the same device **/
- for (ChunkGroupMetaData chunkGroupMetaData : rowGroupMetadataList) {
- List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData
- .getChunkMetaDataList();
- for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
- String measurementUID = chunkMetaData.getMeasurementUid();
- measurementUID = deviceId + "." + measurementUID;
- timeseriesSet.add(measurementUID);
- }
- }
- /** Secondly, use tsFile Reader to form InsertPlan **/
- ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
- List<Path> paths = new ArrayList<>();
- paths.clear();
- for (String timeseries : timeseriesSet) {
- paths.add(new Path(timeseries));
- }
- QueryExpression queryExpression = QueryExpression.create(paths, null);
- QueryDataSet queryDataSet = readTsFile.query(queryExpression);
- while (queryDataSet.hasNext()) {
- RowRecord record = queryDataSet.next();
- List<Field> fields = record.getFields();
- List<String> measurementList = new ArrayList<>();
- List<String> insertValues = new ArrayList<>();
- for (int i = 0; i < fields.size(); i++) {
- Field field = fields.get(i);
- if (!field.isNull()) {
- measurementList.add(paths.get(i).getMeasurement());
- if (fields.get(i).getDataType() == TSDataType.TEXT) {
- insertValues.add(String.format("'%s'", field.toString()));
- } else {
- insertValues.add(String.format("%s", field.toString()));
- }
- }
- }
- if (insertExecutor.insert(new InsertPlan(deviceId, record.getTimestamp(),
- measurementList.toArray(new String[0]), insertValues.toArray(new String[0])))) {
- throw new IOException("Inserting series data to IoTDB engine has failed.");
- }
- }
- }
- } catch (IOException e) {
- logger.error("Can not parse tsfile into SQL", e);
- throw new IOException(e);
- } catch (ProcessorException e) {
- logger.error("Meet error while processing non-query.");
- throw new ProcessorException(e);
- } finally {
- try {
- if (reader != null) {
- reader.close();
- }
- } catch (IOException e) {
- logger.error("Cannot close file stream {}", filePath, e);
- }
- }
- }
-
- /**
- * Insert those valid data in the tsfile into IoTDB
- *
- * @param overlapFiles:files which are conflict with the sync file
- */
- public void loadOldData(String filePath, List<String> overlapFiles)
- throws IOException, ProcessorException {
- Set<String> timeseriesList = new HashSet<>();
- QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
- Map<String, ReadOnlyTsFile> tsfilesReaders = openReaders(filePath, overlapFiles);
- try {
- TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
- Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
- Iterator<String> it = deviceIdMap.keySet().iterator();
- while (it.hasNext()) {
- String deviceID = it.next();
- TsDeviceMetadataIndex deviceMI = deviceIdMap.get(deviceID);
- TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
- List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
- .getChunkGroupMetaDataList();
- timeseriesList.clear();
- /** firstly, get all timeseries in the same device **/
- for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
- List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList();
- for (ChunkMetaData timeSeriesChunkMetaData : chunkMetaDataList) {
- String measurementUID = timeSeriesChunkMetaData.getMeasurementUid();
- measurementUID = deviceID + "." + measurementUID;
- timeseriesList.add(measurementUID);
- }
- }
- reader.close();
-
- /** secondly, use tsFile Reader to form SQL **/
- ReadOnlyTsFile readOnlyTsFile = tsfilesReaders.get(filePath);
- List<Path> paths = new ArrayList<>();
- /** compare data with one timeseries in a round to get valid data **/
- for (String timeseries : timeseriesList) {
- paths.clear();
- paths.add(new Path(timeseries));
- Set<InsertPlan> originDataPoints = new HashSet<>();
- QueryExpression queryExpression = QueryExpression.create(paths, null);
- QueryDataSet queryDataSet = readOnlyTsFile.query(queryExpression);
- Set<InsertPlan> newDataPoints = convertToInserPlans(queryDataSet, paths, deviceID);
-
- /** get all data with the timeseries in all overlap files. **/
- for (String overlapFile : overlapFiles) {
- ReadOnlyTsFile readTsFileOverlap = tsfilesReaders.get(overlapFile);
- QueryDataSet queryDataSetOverlap = readTsFileOverlap.query(queryExpression);
- originDataPoints.addAll(convertToInserPlans(queryDataSetOverlap, paths, deviceID));
- }
-
- /** If there has no overlap data with the timeseries, inserting all data in the sync file **/
- if (originDataPoints.isEmpty()) {
- for (InsertPlan insertPlan : newDataPoints) {
- if (insertExecutor.insert(insertPlan)) {
- throw new IOException("Inserting series data to IoTDB engine has failed.");
- }
- }
- } else {
- /** Compare every data to get valid data **/
- for (InsertPlan insertPlan : newDataPoints) {
- if (!originDataPoints.contains(insertPlan)) {
- if (insertExecutor.insert(insertPlan)) {
- throw new IOException("Inserting series data to IoTDB engine has failed.");
- }
- }
- }
- }
- }
- }
- } catch (IOException e) {
- logger.error("Can not parse tsfile into SQL", e);
- throw new IOException(e);
- } catch (ProcessorException e) {
- logger.error("Meet error while processing non-query.", e);
- throw new ProcessorException(e);
- } finally {
- try {
- closeReaders(tsfilesReaders);
- } catch (IOException e) {
- logger.error("Cannot close file stream {}", filePath, e);
- }
- }
- }
-
- private Set<InsertPlan> convertToInserPlans(QueryDataSet queryDataSet, List<Path> paths, String deviceID) throws IOException {
- Set<InsertPlan> plans = new HashSet<>();
- while (queryDataSet.hasNext()) {
- RowRecord record = queryDataSet.next();
- List<Field> fields = record.getFields();
- /** get all data with the timeseries in the sync file **/
- for (int i = 0; i < fields.size(); i++) {
- Field field = fields.get(i);
- String[] measurementList = new String[1];
- if (!field.isNull()) {
- measurementList[0] = paths.get(i).getMeasurement();
- InsertPlan insertPlan = new InsertPlan(deviceID, record.getTimestamp(),
- measurementList, new String[]{field.getDataType() == TSDataType.TEXT ? String.format("'%s'", field.toString())
- : field.toString()});
- plans.add(insertPlan);
- }
- }
- }
- return plans;
- }
-
- /**
* Open all tsfile reader and cache
*/
private Map<String, ReadOnlyTsFile> openReaders(String filePath, List<String> overlapFiles)
@@ -716,11 +417,8 @@ public class SyncServiceImpl implements SyncService.Iface {
*/
@Override
public void cleanUp() {
- uuid.remove();
fileNum.remove();
fileNodeMap.remove();
- fileNodeStartTime.remove();
- fileNodeEndTime.remove();
schemaFromSenderPath.remove();
try {
FileUtils.deleteDirectory(new File(syncFolderPath));
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index dfe3c52..dffa1e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -27,15 +27,22 @@ public class Constans {
public static final String SYNC_SENDER = "sync-sender";
public static final String SYNC_RECEIVER = "sync-receiver";
+ public static final String MESSAGE_DIGIT_NAME = "MD5";
+ public static final String SYNC_DIR_NAME_SEPARATOR = "_";
+
+ // sender section
+
public static final String LOCK_FILE_NAME = "sync_lock";
+
public static final String SCHEMA_POS_FILE_NAME = "sync_schema_pos";
+
public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt";
+
public static final String CURRENT_LOCAL_FILE_NAME = "current_local_files.txt";
+
public static final String DATA_SNAPSHOT_NAME = "snapshot";
- public static final String SYNC_LOG_NAME = "sync.log";
- public static final String MESSAGE_DIGIT_NAME = "MD5";
- public static final String SYNC_DIR_NAME_SEPARATOR = "_";
+ public static final String SYNC_LOG_NAME = "sync.log";
/**
* Split data file , block size at each transmission
@@ -43,7 +50,7 @@ public class Constans {
public static final int DATA_CHUNK_SIZE = 64 * 1024 * 1024;
/**
- * Max try when syncing the same file to receiver fails.
+ * Maximum try when syncing the same file to receiver fails.
*/
public static final int MAX_SYNC_FILE_TRY = 5;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
index 1a529be..684db79 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
@@ -21,11 +21,35 @@ package org.apache.iotdb.db.sync.sender.manage;
import java.io.File;
import java.io.IOException;
+/**
+ * This interface is used to manage deleted files and new closed files that need to be synchronized in each
+ * sync task.
+ */
public interface ISyncFileManager {
+ /**
+ * Find out all closed and unmodified files, which means there has a .resource file and doesn't
+ * have a .mod file. For these files, they will eventually generate a new tsfile file as the merge
+ * operation is executed and executed in subsequent synchronization tasks.
+ *
+ * @param dataDir data directory
+ */
void getCurrentLocalFiles(String dataDir);
+ /**
+ * Load last local files from file<lastLocalFile> which does not contain those tsfiles which are
+ * not synced successfully in previous sync tasks.
+ *
+ * @param lastLocalFile last local file, which may not exist in first sync task.
+ */
void getLastLocalFiles(File lastLocalFile) throws IOException;
+ /**
+ * Based on current local files and last local files, we can distinguish two kinds of files
+ * between them, one is deleted files, the other is new files. These two kinds of files are valid
+ * files that need to be synchronized to the receiving end.
+ *
+ * @param dataDir data directory
+ */
void getValidFiles(String dataDir) throws IOException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
index c08b64f..23af20c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -39,12 +39,32 @@ public class SyncFileManager implements ISyncFileManager {
private static final Logger LOGGER = LoggerFactory.getLogger(SyncFileManager.class);
+ /**
+ * All storage groups on the disk where the current sync task is executed
+ */
+ private Set<String> allSG;
+
+ /**
+ * Key is storage group, value is the set of current sealed tsfile in the sg.
+ */
private Map<String, Set<File>> currentSealedLocalFilesMap;
+ /**
+ * Key is storage group, value is the set of last local tsfiles in the sg, which don't contains
+ * those tsfiles which are not synced successfully.
+ */
private Map<String, Set<File>> lastLocalFilesMap;
+ /**
+ * Key is storage group, value is the valid set of deleted tsfiles which need to be synced to
+ * receiver end in the sg.
+ */
private Map<String, Set<File>> deletedFilesMap;
+ /**
+ * Key is storage group, value is the valid set of new tsfiles which need to be synced to
+ * receiver end in the sg.
+ */
private Map<String, Set<File>> toBeSyncedFilesMap;
private SyncFileManager() {
@@ -58,15 +78,18 @@ public class SyncFileManager implements ISyncFileManager {
@Override
public void getCurrentLocalFiles(String dataDir) {
LOGGER.info("Start to get current local files in data folder {}", dataDir);
+
// get all files in data dir sequence folder
Map<String, Set<File>> currentAllLocalFiles = new HashMap<>();
File[] allSGFolders = new File(
dataDir + File.separatorChar + IoTDBConstant.SEQUENCE_FLODER_NAME)
.listFiles();
for (File sgFolder : allSGFolders) {
+ allSG.add(sgFolder.getName());
currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashSet<>());
- Arrays.stream(sgFolder.listFiles()).forEach(file -> currentAllLocalFiles.get(sgFolder.getName())
- .add(new File(sgFolder.getAbsolutePath(), file.getName())));
+ Arrays.stream(sgFolder.listFiles())
+ .forEach(file -> currentAllLocalFiles.get(sgFolder.getName())
+ .add(new File(sgFolder.getAbsolutePath(), file.getName())));
}
// get sealed tsfiles
@@ -92,10 +115,14 @@ public class SyncFileManager implements ISyncFileManager {
LOGGER.info("Start to get last local files from last local file info {}",
lastLocalFileInfo.getAbsoluteFile());
lastLocalFilesMap = new HashMap<>();
+ if(!lastLocalFileInfo.exists()){
+ return;
+ }
try (BufferedReader reader = new BufferedReader(new FileReader(lastLocalFileInfo))) {
String fileName;
while ((fileName = reader.readLine()) != null) {
String sgName = new File(fileName).getParent();
+ allSG.add(sgName);
lastLocalFilesMap.putIfAbsent(sgName, new HashSet<>());
lastLocalFilesMap.get(sgName).add(new File(fileName));
}
@@ -104,21 +131,21 @@ public class SyncFileManager implements ISyncFileManager {
@Override
public void getValidFiles(String dataDir) throws IOException {
+ allSG = new HashSet<>();
getCurrentLocalFiles(dataDir);
getLastLocalFiles(new File(SyncSenderDescriptor.getInstance().getConfig().getLastFileInfo()));
toBeSyncedFilesMap = new HashMap<>();
deletedFilesMap = new HashMap<>();
- for(Entry<String, Set<File>> entry: currentSealedLocalFilesMap.entrySet()){
- String sgName = entry.getKey();
+ for (String sgName : allSG) {
toBeSyncedFilesMap.putIfAbsent(sgName, new HashSet<>());
deletedFilesMap.putIfAbsent(sgName, new HashSet<>());
- for(File newFile:currentSealedLocalFilesMap.get(sgName)){
- if(!lastLocalFilesMap.get(sgName).contains(newFile)){
+ for (File newFile : currentSealedLocalFilesMap.getOrDefault(sgName, new HashSet<>())) {
+ if (!lastLocalFilesMap.getOrDefault(sgName, new HashSet<>()).contains(newFile)) {
toBeSyncedFilesMap.get(sgName).add(newFile);
}
}
- for(File oldFile:lastLocalFilesMap.get(sgName)){
- if(!currentSealedLocalFilesMap.get(sgName).contains(oldFile)){
+ for (File oldFile : lastLocalFilesMap.getOrDefault(sgName, new HashSet<>())) {
+ if (!currentSealedLocalFilesMap.getOrDefault(sgName, new HashSet<>()).contains(oldFile)) {
deletedFilesMap.get(sgName).add(oldFile);
}
}
@@ -137,6 +164,10 @@ public class SyncFileManager implements ISyncFileManager {
return toBeSyncedFilesMap;
}
+ public Set<String> getAllSG() {
+ return allSG;
+ }
+
private static class SyncFileManagerHolder {
private static final SyncFileManager INSTANCE = new SyncFileManager();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
index 53cb54f..d0c09b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
@@ -20,14 +20,19 @@ package org.apache.iotdb.db.sync.sender.recover;
import java.util.Set;
+/**
+ * This interface is used to restore and clean up the status of the historical synchronization task
+ * with abnormal termination. Through the analysis of the synchronization task log, the completed
+ * progress is merged to prepare for the next synchronization task.
+ */
public interface ISyncSenderLogAnalyzer {
+ void recover();
+
void loadLastLocalFiles(Set<String> lastLocalFiles);
void loadLogger(Set<String> deletedFiles, Set<String> newFiles);
- void recover();
-
void clearLogger(Set<String> currentLocalFiles);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
index d1cdf9e..0229579 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
@@ -21,14 +21,40 @@ package org.apache.iotdb.db.sync.sender.recover;
import java.io.File;
import java.io.IOException;
+/**
+ * This interface is used to log progress in the process of synchronization tasks. If the
+ * synchronization tasks are completed normally and there are no exceptions, the log records will be
+ * deleted; otherwise, the status can be restored according to the log at the start of each task. It
+ * ensures the correctness of synchronization module when system crash or network abnormality
+ * occur.
+ */
public interface ISyncSenderLogger {
+ /**
+ * Start sync deleted files name
+ * @throws IOException
+ */
void startSyncDeletedFilesName() throws IOException;
+ /**
+ * After a deleted file name is synced to the receiver end, record it in sync log.
+ * @param file the deleted tsfile
+ * @throws IOException
+ */
void finishSyncDeletedFileName(File file) throws IOException;
+ /**
+ * Start sync new tsfiles
+ * @throws IOException
+ */
void startSyncTsFiles() throws IOException;
+ /**
+ *
+ * After a new tsfile is synced to the receiver end, record it in sync log.
+ * @param file new tsfile
+ * @throws IOException
+ */
void finishSyncTsfile(File file) throws IOException;
void close() throws IOException;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
index 9649806..3acfd66 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.sync.sender.conf.Constans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer{
+public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderLogAnalyzer.class);
private File currentLocalFile;
@@ -109,6 +109,7 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer{
} catch (IOException e) {
LOGGER.error("Can not clear sync log {}", syncLogFile.getAbsoluteFile(), e);
}
+ lastLocalFile.deleteOnExit();
currentLocalFile.renameTo(lastLocalFile);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index fc68960..12803f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -1,4 +1,21 @@
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.sync.sender.transfer;
import java.io.BufferedReader;
@@ -19,15 +36,18 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.SyncConnectionException;
import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.sync.sender.conf.Constans;
@@ -60,30 +80,38 @@ public class DataTransferManager implements IDataTransferManager {
private static final int BATCH_LINE = 1000;
+ /**
+ * When transferring schema information, it is a better choice to transfer only new schema
+ * information, avoiding duplicate data transmission. The schema log is self-increasing, so the
+ * location is recorded once after each synchronization task for the next synchronization task to
+ * use.
+ */
private int schemaFileLinePos;
private TTransport transport;
private SyncService.Client serviceClient;
- /**
- * Files that need to be synchronized
- */
+ private Set<String> allSG;
+
private Map<String, Set<File>> toBeSyncedFilesMap;
private Map<String, Set<File>> deletedFilesMap;
- private Map<String, Set<File>> sucessSyncedFilesMap;
+ private Map<String, Set<File>> lastLocalFilesMap;
- private Map<String, Set<File>> successDeletedFilesMap;
+ private Map<String, Set<File>> successSyncedFilesMap = new HashMap<>();
- private Map<String, Set<File>> lastLocalFilesMap;
+ private Map<String, Set<File>> successDeletedFilesMap = new HashMap<>();
/**
* If true, sync is in execution.
**/
private volatile boolean syncStatus = false;
+ /**
+ * Record sync progress in log.
+ */
private SyncSenderLogger syncLog;
private SyncFileManager syncFileManager = SyncFileManager.getInstance();
@@ -99,7 +127,7 @@ public class DataTransferManager implements IDataTransferManager {
}
/**
- * Create a sender and sync files to the receiver.
+ * Create a sender and sync files to the receiver periodically.
*/
public static void main(String[] args) throws IOException {
Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
@@ -109,10 +137,9 @@ public class DataTransferManager implements IDataTransferManager {
fileSenderImpl.startTimedTask();
}
-
/**
- * The method is to verify whether the client lock file is locked or not, ensuring that only one
- * client is running.
+ * Verify whether the client lock file is locked or not, ensuring that only one client is
+ * running.
*/
private void verifySingleton() throws IOException {
File lockFile = new File(config.getLockFilePath());
@@ -165,7 +192,7 @@ public class DataTransferManager implements IDataTransferManager {
}
/**
- * Start Monitor Thread, monitor sync status
+ * Start monitor thread, which monitor sync status.
*/
private void startMonitor() {
executorService.scheduleWithFixedDelay(() -> {
@@ -195,11 +222,12 @@ public class DataTransferManager implements IDataTransferManager {
executorService = null;
}
+ @Override
public void syncAll() throws SyncConnectionException, IOException, TException {
// 1. Connect to sync receiver and confirm identity
establishConnection(config.getServerIp(), config.getServerPort());
- if (!confirmIdentity(config.getSenderPath())) {
+ if (!confirmIdentity()) {
logger.error("Sorry, you do not have the permission to connect to sync receiver.");
System.exit(1);
}
@@ -212,16 +240,18 @@ public class DataTransferManager implements IDataTransferManager {
for (String dataDir : dataDirs) {
logger.info("Start to sync data in data dir {}", dataDir);
config.update(dataDir);
- SyncFileManager.getInstance().getValidFiles(dataDir);
- lastLocalFilesMap = SyncFileManager.getInstance().getLastLocalFilesMap();
- deletedFilesMap = SyncFileManager.getInstance().getDeletedFilesMap();
- toBeSyncedFilesMap = SyncFileManager.getInstance().getToBeSyncedFilesMap();
+ syncFileManager.getValidFiles(dataDir);
+ allSG = syncFileManager.getAllSG();
+ lastLocalFilesMap = syncFileManager.getLastLocalFilesMap();
+ deletedFilesMap = syncFileManager.getDeletedFilesMap();
+ toBeSyncedFilesMap = syncFileManager.getToBeSyncedFilesMap();
checkRecovery();
if (SyncUtils.isEmpty(deletedFilesMap) && SyncUtils.isEmpty(toBeSyncedFilesMap)) {
logger.info("There has no data to sync in data dir {}", dataDir);
continue;
}
sync();
+ endSync();
logger.info("Finish to sync data in data dir {}", dataDir);
}
@@ -236,31 +266,161 @@ public class DataTransferManager implements IDataTransferManager {
}
}
+ private void checkRecovery() {
+ new SyncSenderLogAnalyzer(config.getSenderPath()).recover();
+ }
+
+ @Override
+ public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
+ transport = new TSocket(serverIp, serverPort);
+ TProtocol protocol = new TBinaryProtocol(transport);
+ serviceClient = new SyncService.Client(protocol);
+ try {
+ transport.open();
+ } catch (TTransportException e) {
+ logger.error("Cannot connect to the receiver.");
+ throw new SyncConnectionException(e);
+ }
+ }
+
+ @Override
+ public boolean confirmIdentity() throws SyncConnectionException {
+ try {
+ return serviceClient.checkIdentity(InetAddress.getLocalHost().getHostAddress())
+ == ResultStatus.SUCCESS;
+ } catch (Exception e) {
+ logger.error("Cannot confirm identity with the receiver.");
+ throw new SyncConnectionException(e);
+ }
+ }
+
+ @Override
+ public void syncSchema() throws SyncConnectionException, TException {
+ int retryCount = 0;
+ serviceClient.initSyncData(MetadataConstant.METADATA_LOG);
+ while (true) {
+ if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
+ throw new SyncConnectionException(String
+ .format("Can not sync schema after %s retries.", Constans.MAX_SYNC_FILE_TRY));
+ }
+ try {
+ if (tryToSyncSchema()) {
+ writeSyncSchemaPos(getSchemaPosFile());
+ break;
+ }
+ } finally {
+ retryCount++;
+ }
+ }
+ }
+
+ private boolean tryToSyncSchema() {
+ int schemaPos = readSyncSchemaPos(getSchemaPosFile());
+
+ // start to sync file data and get md5 of this file.
+ try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
+ schemaFileLinePos = 0;
+ while (schemaFileLinePos++ <= schemaPos) {
+ br.readLine();
+ }
+ MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
+ String line;
+ int cntLine = 0;
+ while ((line = br.readLine()) != null) {
+ schemaFileLinePos++;
+ byte[] singleLineData = BytesUtils.stringToBytes(line);
+ bos.write(singleLineData);
+ md.update(singleLineData);
+ if (cntLine++ == BATCH_LINE) {
+ ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+ bos.reset();
+ if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
+ logger.error("Receiver failed to receive metadata, retry.");
+ return false;
+ }
+ cntLine = 0;
+ }
+ }
+ if (bos.size() != 0) {
+ ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+ bos.reset();
+ if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
+ logger.error("Receiver failed to receive metadata, retry.");
+ return false;
+ }
+ }
+
+ // check md5
+ return checkMD5ForSchema((new BigInteger(1, md.digest())).toString(16));
+ } catch (NoSuchAlgorithmException | IOException | TException e) {
+ logger.error("Can not finish transfer schema to receiver", e);
+ return false;
+ }
+ }
+
/**
- * Execute a sync task.
+ * Check MD5 of schema to make sure that the receiver receives the schema correctly
*/
+ private boolean checkMD5ForSchema(String md5OfSender) throws TException {
+ String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
+ if (md5OfSender.equals(md5OfReceiver)) {
+ logger.info("Receiver has received schema successfully, retry.");
+ return true;
+ } else {
+ logger
+ .error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
+ return false;
+ }
+ }
+
+ private int readSyncSchemaPos(File syncSchemaLogFile) {
+ try {
+ if (syncSchemaLogFile.exists()) {
+ try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) {
+ return Integer.parseInt(br.readLine());
+ }
+ }
+ } catch (IOException e) {
+ logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
+ }
+ return 0;
+ }
+
+ private void writeSyncSchemaPos(File syncSchemaLogFile) {
+ try {
+ if (!syncSchemaLogFile.exists()) {
+ syncSchemaLogFile.createNewFile();
+ }
+ try (BufferedWriter br = new BufferedWriter(new FileWriter(syncSchemaLogFile))) {
+ br.write(Integer.toString(schemaFileLinePos));
+ }
+ } catch (IOException e) {
+ logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
+ }
+ }
+
@Override
public void sync() throws IOException {
try {
syncStatus = true;
syncLog = new SyncSenderLogger(getSchemaLogFile());
+ successSyncedFilesMap = new HashMap<>();
+ successDeletedFilesMap = new HashMap<>();
- // 1. Sync data
- for (Entry<String, Set<File>> entry : deletedFilesMap.entrySet()) {
- checkRecovery();
+ for (String sgName : allSG) {
syncLog = new SyncSenderLogger(getSyncLogFile());
- // TODO deal with the situation
try {
- if (serviceClient.init(entry.getKey()) == ResultStatus.FAILURE) {
+ if (serviceClient.init(sgName) == ResultStatus.FAILURE) {
throw new SyncConnectionException("unable init receiver");
}
} catch (TException | SyncConnectionException e) {
throw new SyncConnectionException("Unable to connect to receiver", e);
}
- logger.info("Sync process starts to transfer data of storage group {}", entry.getKey());
- syncDeletedFilesName(entry.getKey(), entry.getValue());
- syncDataFilesInOneGroup(entry.getKey(), entry.getValue());
- clearSyncLog();
+ logger.info("Sync process starts to transfer data of storage group {}", sgName);
+ syncDeletedFilesNameInOneGroup(sgName,
+ deletedFilesMap.getOrDefault(sgName, new HashSet<>()));
+ syncDataFilesInOneGroup(sgName, toBeSyncedFilesMap.getOrDefault(sgName, new HashSet<>()));
}
} catch (SyncConnectionException e) {
@@ -273,12 +433,9 @@ public class DataTransferManager implements IDataTransferManager {
}
}
- private void checkRecovery() {
- new SyncSenderLogAnalyzer(config.getSenderPath()).recover();
- }
-
@Override
- public void syncDeletedFilesName(String sgName, Set<File> deletedFilesName) throws IOException {
+ public void syncDeletedFilesNameInOneGroup(String sgName, Set<File> deletedFilesName)
+ throws IOException {
if (deletedFilesName.isEmpty()) {
logger.info("There has no deleted files to be synced in storage group {}", sgName);
return;
@@ -313,7 +470,8 @@ public class DataTransferManager implements IDataTransferManager {
try {
snapshotFile = makeFileSnapshot(tsfile);
syncSingleFile(snapshotFile);
- sucessSyncedFilesMap.get(sgName).add(tsfile);
+ syncSingleFile(new File(snapshotFile, TsFileResource.RESOURCE_SUFFIX));
+ successSyncedFilesMap.get(sgName).add(tsfile);
syncLog.finishSyncTsfile(tsfile);
logger.info("Task of synchronization has completed {}/{}.", cnt, toBeSyncFiles.size());
} catch (IOException e) {
@@ -329,6 +487,11 @@ public class DataTransferManager implements IDataTransferManager {
logger.info("Sync process has finished storage group {}.", sgName);
}
+ /**
+ * Make snapshot<hard link> for new tsfile and its .restore file.
+ *
+ * @param file new tsfile to be synced
+ */
private File makeFileSnapshot(File file) throws IOException {
File snapshotFile = SyncUtils.getSnapshotFile(file);
if (!snapshotFile.getParentFile().exists()) {
@@ -337,11 +500,16 @@ public class DataTransferManager implements IDataTransferManager {
Path link = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
Path target = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
Files.createLink(link, target);
+ link = FileSystems.getDefault()
+ .getPath(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+ target = FileSystems.getDefault()
+ .getPath(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+ Files.createLink(link, target);
return snapshotFile;
}
/**
- * Transfer data of a storage group to receiver.
+ * Transfer data of a tsfile to the receiver.
*/
private void syncSingleFile(File snapshotFile) throws SyncConnectionException {
try {
@@ -387,156 +555,18 @@ public class DataTransferManager implements IDataTransferManager {
}
}
-
- /**
- * Establish a connection between the sender and the receiver.
- *
- * @param serverIp the ip address of the receiver
- * @param serverPort must be same with port receiver set.
- */
- @Override
- public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
- transport = new TSocket(serverIp, serverPort);
- TProtocol protocol = new TBinaryProtocol(transport);
- serviceClient = new SyncService.Client(protocol);
- try {
- transport.open();
- } catch (TTransportException e) {
- logger.error("Cannot connect to server");
- throw new SyncConnectionException(e);
- }
- }
-
- /**
- * UUID marks the identity of sender for receiver.
- */
- @Override
- public boolean confirmIdentity(String uuidPath) throws SyncConnectionException {
- try {
- return serviceClient.checkIdentity(InetAddress.getLocalHost().getHostAddress())
- == ResultStatus.SUCCESS;
- } catch (Exception e) {
- logger.error("Cannot confirm identity with receiver");
- throw new SyncConnectionException(e);
- }
- }
-
- /**
- * Sync schema with receiver.
- */
- @Override
- public void syncSchema() throws SyncConnectionException, TException {
- int retryCount = 0;
- serviceClient.initSyncData(MetadataConstant.METADATA_LOG);
- while (true) {
- if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
- throw new SyncConnectionException(String
- .format("Can not sync schema after %s tries.", Constans.MAX_SYNC_FILE_TRY));
- }
- try {
- if (tryToSyncSchema()) {
- writeSyncSchemaPos(getSchemaPosFile());
- break;
- }
- } finally {
- retryCount++;
- }
- }
- }
-
- private boolean tryToSyncSchema() {
- int schemaPos = readSyncSchemaPos(getSchemaPosFile());
-
- // start to sync file data and get md5 of this file.
- try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
- ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
- schemaFileLinePos = 0;
- while (schemaFileLinePos++ <= schemaPos) {
- br.readLine();
- }
- MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
- String line;
- int cntLine = 0;
- while ((line = br.readLine()) != null) {
- schemaFileLinePos++;
- byte[] singleLineData = BytesUtils.stringToBytes(line);
- bos.write(singleLineData);
- md.update(singleLineData);
- if (cntLine++ == BATCH_LINE) {
- ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
- bos.reset();
- // PROCESSING_STATUS represents there is still schema buffer to send.
- if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
- logger.error("Receiver failed to receive metadata, retry.");
- return false;
- }
- cntLine = 0;
- }
- }
- if (bos.size() != 0) {
- ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
- bos.reset();
- if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
- logger.error("Receiver failed to receive metadata, retry.");
- return false;
- }
- }
-
- // check md5
- return checkMD5ForSchema((new BigInteger(1, md.digest())).toString(16));
- } catch (NoSuchAlgorithmException | IOException | TException e) {
- logger.error("Can not finish transfer schema to receiver", e);
- return false;
- }
- }
-
-
- private boolean checkMD5ForSchema(String md5OfSender) throws TException {
- String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
- if (md5OfSender.equals(md5OfReceiver)) {
- logger.info("Receiver has received schema successfully, retry.");
- return true;
- } else {
- logger
- .error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
- return false;
- }
- }
-
- private int readSyncSchemaPos(File syncSchemaLogFile) {
- try {
- if (syncSchemaLogFile.exists()) {
- try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) {
- return Integer.parseInt(br.readLine());
- }
- }
- } catch (IOException e) {
- logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
- }
- return 0;
- }
-
- private void writeSyncSchemaPos(File syncSchemaLogFile) {
- try {
- if (syncSchemaLogFile.exists()) {
- try (BufferedWriter br = new BufferedWriter(new FileWriter(syncSchemaLogFile))) {
- br.write(Integer.toString(schemaFileLinePos));
- }
- }
- } catch (IOException e) {
- logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
- }
- }
-
- private void clearSyncLog() {
+ private void endSync() {
+ // 1. Organize current local files based on sync result
for (Entry<String, Set<File>> entry : lastLocalFilesMap.entrySet()) {
entry.getValue()
.removeAll(successDeletedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
entry.getValue()
- .removeAll(sucessSyncedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
+ .removeAll(successSyncedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
}
File currentLocalFile = getCurrentLogFile();
File lastLocalFile = new File(config.getLastFileInfo());
+
+ // 2. Write file list to currentLocalFile
try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
for (Set<File> currentLocalFiles : lastLocalFilesMap.values()) {
for (File file : currentLocalFiles) {
@@ -548,7 +578,20 @@ public class DataTransferManager implements IDataTransferManager {
} catch (IOException e) {
logger.error("Can not clear sync log {}", lastLocalFile.getAbsoluteFile(), e);
}
+
+ // 3. Rename currentLocalFile to lastLocalFile
+ lastLocalFile.deleteOnExit();
currentLocalFile.renameTo(lastLocalFile);
+
+ // 4. delete snapshot directory
+ try {
+ FileUtils.deleteDirectory(new File(config.getSnapshotPath()));
+ } catch (IOException e) {
+ logger.error("Can not clear snapshot directory {}", config.getSnapshotPath(), e);
+ }
+
+ // 5. delete sync log file
+ getSyncLogFile().deleteOnExit();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
index 3366b91..bff3f2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
@@ -25,45 +25,61 @@ import org.apache.iotdb.db.exception.SyncConnectionException;
import org.apache.thrift.TException;
/**
- * SyncSender defines the methods of a sender in sync module.
+ * This interface is used to realize the data transmission part of synchronization task, and is also
+ * the most important part of synchronization task. By screening out all transmission files to be
+ * synchronized in <class>SyncFileManager</class>, these files are synchronized to the receiving end
+ * to complete the synchronization task.
*/
public interface IDataTransferManager {
- /**
- * Init
- */
void init();
/**
- * Connect to server.
+ * Establish a connection to receiver end.
*/
void establishConnection(String serverIp, int serverPort) throws SyncConnectionException;
/**
- * Transfer UUID to receiver.
+ * Confirm identity, the receiver will check whether the sender has synchronization privileges.
*/
- boolean confirmIdentity(String uuidPath) throws SyncConnectionException, IOException;
+ boolean confirmIdentity() throws SyncConnectionException, IOException;
/**
- * Send schema file to receiver.
+ * Sync schema file to receiver before all data to be synced.
*/
void syncSchema() throws SyncConnectionException, TException;
- void syncDeletedFilesName(String sgName, Set<File> deletedFilesName)
+ /**
+ * For deleted files in a storage group, sync them to receiver side and load these data in
+ * receiver.
+ *
+ * @param sgName storage group name
+ * @param deletedFilesName list of deleted file names
+ */
+ void syncDeletedFilesNameInOneGroup(String sgName, Set<File> deletedFilesName)
throws SyncConnectionException, IOException;
/**
- * For all valid files, send it to receiver side and load these data in receiver.
+ * Execute a sync task for all data directory.
*/
- void syncDataFilesInOneGroup(String sgName, Set<File> deletedFilesName)
- throws SyncConnectionException, IOException;
+ void syncAll() throws SyncConnectionException, IOException, TException;
/**
- * Execute a sync task.
+ * Execute a sync task for a data directory.
*/
void sync() throws SyncConnectionException, IOException;
/**
+ * For new valid files in a storage group, sync them to receiver side and load these data in
+ * receiver.
+ *
+ * @param sgName storage group name
+ * @param toBeSyncFiles list of new tsfile names
+ */
+ void syncDataFilesInOneGroup(String sgName, Set<File> toBeSyncFiles)
+ throws SyncConnectionException, IOException;
+
+ /**
* Stop sync process
*/
void stop();