You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/08/22 03:14:13 UTC
[incubator-iotdb] 02/05: complete manage module
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit ab86f899065649273be9cf188e7620c48fe120e0
Author: lta <li...@163.com>
AuthorDate: Wed Aug 21 15:39:25 2019 +0800
complete manage module
---
server/iotdb/conf/iotdb-sync-client.properties | 12 +-
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 4 +
.../db/conf/directories/DirectoryManager.java | 8 +-
.../org/apache/iotdb/db/metadata/MManager.java | 2 +-
.../db/sync/receiver/transfer/SyncServiceImpl.java | 65 +-
.../receiver/transfer/SyncServiceImplBackup.java | 736 +++++++++++++++++++++
.../iotdb/db/sync/sender/SyncFileManager.java | 208 ------
.../apache/iotdb/db/sync/sender/conf/Constans.java | 17 +-
.../db/sync/sender/conf/SyncSenderConfig.java | 129 ++--
.../db/sync/sender/conf/SyncSenderDescriptor.java | 36 +-
.../{IFileManager.java => ISyncFileManager.java} | 12 +-
.../db/sync/sender/manage/SyncFileManager.java | 157 +++++
.../db/sync/sender/recover/ISyncSenderLogger.java | 21 +-
.../db/sync/sender/recover/SyncSenderLogger.java | 86 +++
.../sync/sender/transfer/DataTransferManager.java | 542 ++++++++-------
.../sync/sender/transfer/IDataTransferManager.java | 16 +-
.../java/org/apache/iotdb/db/utils/SyncUtils.java | 39 +-
.../iotdb/db/sync/sender/SingleClientSyncTest.java | 4 +-
.../iotdb/db/sync/sender/SyncFileManagerTest.java | 2 +-
service-rpc/src/main/thrift/sync.thrift | 21 +-
20 files changed, 1460 insertions(+), 657 deletions(-)
diff --git a/server/iotdb/conf/iotdb-sync-client.properties b/server/iotdb/conf/iotdb-sync-client.properties
index 65b3074..3606ab4 100644
--- a/server/iotdb/conf/iotdb-sync-client.properties
+++ b/server/iotdb/conf/iotdb-sync-client.properties
@@ -17,19 +17,11 @@
# under the License.
#
-# Sync server port address
+# Sync receiver server address
server_ip=127.0.0.1
-# Sync client port
+# Sync receiver server port
server_port=5555
# The period time of sync process, the time unit is second.
sync_period_in_second=600
-
-# Set bufferWrite data absolute path of IoTDB
-# It needs to be set with iotdb_schema_directory, they have to belong to the same IoTDB
-# iotdb_bufferWrite_directory = D:\\iotdb\\data\\data\\settled
-
-# Set schema file absolute path of IoTDB
-# It needs to be set with iotdb_bufferWrite_directory, they have to belong to the same IoTDB
-# iotdb_schema_directory = D:\\iotdb\\data\\system\\schema\\mlog.txt
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 3ea1621..13042aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -65,4 +65,8 @@ public class IoTDBConstant {
public static final String USER = "User";
public static final String PRIVILEGE = "Privilege";
+ // data folder name
+ public static final String SEQUENCE_FLODER_NAME = "sequence";
+ public static final String UNSEQUENCE_FLODER_NAME = "unsequence";
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
index 9b4b58d..e3b8d5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategy;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
@@ -44,14 +45,16 @@ public class DirectoryManager {
sequenceFileFolders =
new ArrayList<>(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
for (int i = 0; i < sequenceFileFolders.size(); i++) {
- sequenceFileFolders.set(i, sequenceFileFolders.get(i) + File.separator + "sequence");
+ sequenceFileFolders
+ .set(i, sequenceFileFolders.get(i) + File.separator + IoTDBConstant.SEQUENCE_FLODER_NAME);
}
mkDirs(sequenceFileFolders);
unsequenceFileFolders =
new ArrayList<>(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
for (int i = 0; i < unsequenceFileFolders.size(); i++) {
- unsequenceFileFolders.set(i, unsequenceFileFolders.get(i) + File.separator + "unsequence");
+ unsequenceFileFolders.set(i,
+ unsequenceFileFolders.get(i) + File.separator + IoTDBConstant.UNSEQUENCE_FLODER_NAME);
}
mkDirs(unsequenceFileFolders);
@@ -120,6 +123,7 @@ public class DirectoryManager {
}
private static class DirectoriesHolder {
+
private static final DirectoryManager INSTANCE = new DirectoryManager();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 1116ff9..ad50573 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -80,7 +80,7 @@ public class MManager {
private MManager() {
schemaDir =
- IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "schema";
+ IoTDBDescriptor.getInstance().getConfig().getSchemaDir();
File systemFolder = new File(schemaDir);
if (!systemFolder.exists()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 4866d41..02bb5b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -45,10 +45,10 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.metadata.MetadataOperationType;
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.sync.sender.conf.Constans;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.ResultStatus;
import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
@@ -73,6 +74,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,22 +88,13 @@ public class SyncServiceImpl implements SyncService.Iface {
**/
private static final MManager metadataManger = MManager.getInstance();
- private static final String SYNC_SERVER = Constans.SYNC_SERVER;
+ private static final String SYNC_SERVER = Constans.SYNC_RECEIVER;
- private ThreadLocal<String> uuid = new ThreadLocal<>();
/**
* String means storage group,List means the set of new files(path) in local IoTDB and String
* means path of new Files
**/
private ThreadLocal<Map<String, List<String>>> fileNodeMap = new ThreadLocal<>();
- /**
- * Map String1 means timeseries String2 means path of new Files, long means startTime
- **/
- private ThreadLocal<Map<String, Map<String, Long>>> fileNodeStartTime = new ThreadLocal<>();
- /**
- * Map String1 means timeseries String2 means path of new Files, long means endTime
- **/
- private ThreadLocal<Map<String, Map<String, Long>>> fileNodeEndTime = new ThreadLocal<>();
/**
* Total num of files that needs to be loaded
@@ -131,28 +124,41 @@ public class SyncServiceImpl implements SyncService.Iface {
/**
* Sync folder path of server
**/
- private String syncFolderPath;
+ private ThreadLocal<String> syncFolderPath = new ThreadLocal<>();
/**
* Sync data path of server
*/
- private String syncDataPath;
+ private ThreadLocal<String> syncDataPath = new ThreadLocal<>();
+
+ private ThreadLocal<String> currentSG = new ThreadLocal<>();
+
+
+ /**
+ * Verify IP address of sender
+ */
+ @Override
+ public ResultStatus checkIdentity(String ipAddress) {
+ Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
+ initPath();
+ return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress) ? ResultStatus.SUCCESS
+ : ResultStatus.FAILURE;
+ }
/**
* Init threadLocal variable and delete old useless files.
*/
@Override
- public boolean init(String storageGroup) {
+ public ResultStatus init(String storageGroup) {
logger.info("Sync process starts to receive data of storage group {}", storageGroup);
+ currentSG.set(storageGroup);
fileNum.set(0);
fileNodeMap.set(new HashMap<>());
- fileNodeStartTime.set(new HashMap<>());
- fileNodeEndTime.set(new HashMap<>());
try {
FileUtils.deleteDirectory(new File(syncDataPath));
} catch (IOException e) {
logger.error("cannot delete directory {} ", syncFolderPath);
- return false;
+ return ResultStatus.FAILURE;
}
for (String bufferWritePath : bufferWritePaths) {
bufferWritePath = FilePathUtils.regularizePath(bufferWritePath);
@@ -163,22 +169,11 @@ public class SyncServiceImpl implements SyncService.Iface {
FileUtils.deleteDirectory(backupDirectory);
} catch (IOException e) {
logger.error("cannot delete directory {} ", syncFolderPath);
- return false;
+ return ResultStatus.FAILURE;
}
}
}
- return true;
- }
-
- /**
- * Verify IP address of sender
- */
- @Override
- public boolean checkIdentity(String uuid, String ipAddress) {
- Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
- this.uuid.set(uuid);
- initPath();
- return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress);
+ return ResultStatus.SUCCESS;
}
/**
@@ -242,6 +237,16 @@ public class SyncServiceImpl implements SyncService.Iface {
return md5OfReceiver;
}
+ @Override
+ public String checkDataMD5(String md5) throws TException {
+ return null;
+ }
+
+ @Override
+ public void endSync() throws TException {
+
+ }
+
/**
* Load metadata from sender
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java
new file mode 100644
index 0000000..6f2f754
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java
@@ -0,0 +1,736 @@
+///**
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+//package org.apache.iotdb.db.sync.receiver.transfer;
+//
+//import java.io.BufferedReader;
+//import java.io.File;
+//import java.io.FileInputStream;
+//import java.io.FileNotFoundException;
+//import java.io.FileOutputStream;
+//import java.io.IOException;
+//import java.math.BigInteger;
+//import java.nio.ByteBuffer;
+//import java.nio.channels.FileChannel;
+//import java.security.MessageDigest;
+//import java.util.ArrayList;
+//import java.util.Collections;
+//import java.util.HashMap;
+//import java.util.HashSet;
+//import java.util.Iterator;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.Map.Entry;
+//import java.util.Set;
+//import org.apache.commons.io.FileUtils;
+//import org.apache.commons.lang3.StringUtils;
+//import org.apache.iotdb.db.concurrent.ThreadName;
+//import org.apache.iotdb.db.conf.IoTDBConfig;
+//import org.apache.iotdb.db.conf.IoTDBDescriptor;
+//import org.apache.iotdb.db.conf.directories.DirectoryManager;
+//import org.apache.iotdb.db.engine.StorageEngine;
+//import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+//import org.apache.iotdb.db.exception.MetadataErrorException;
+//import org.apache.iotdb.db.exception.PathErrorException;
+//import org.apache.iotdb.db.exception.ProcessorException;
+//import org.apache.iotdb.db.exception.StorageEngineException;
+//import org.apache.iotdb.db.metadata.MManager;
+//import org.apache.iotdb.db.metadata.MetadataConstant;
+//import org.apache.iotdb.db.metadata.MetadataOperationType;
+//import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+//import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+//import org.apache.iotdb.db.sync.sender.conf.Constans;
+//import org.apache.iotdb.db.utils.FilePathUtils;
+//import org.apache.iotdb.db.utils.SyncUtils;
+//import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
+//import org.apache.iotdb.service.sync.thrift.SyncService;
+//import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+//import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+//import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+//import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+//import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+//import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+//import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+//import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
+//import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+//import org.apache.iotdb.tsfile.read.common.Field;
+//import org.apache.iotdb.tsfile.read.common.Path;
+//import org.apache.iotdb.tsfile.read.common.RowRecord;
+//import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+//import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//public class SyncServiceImplBackup implements SyncService.Iface {
+//
+// private static final Logger logger = LoggerFactory.getLogger(SyncServiceImplBackup.class);
+//
+// private static final StorageEngine STORAGE_GROUP_MANAGER = StorageEngine.getInstance();
+// /**
+// * Metadata manager
+// **/
+// private static final MManager metadataManger = MManager.getInstance();
+//
+// private static final String SYNC_SERVER = Constans.SYNC_RECEIVER;
+//
+// private ThreadLocal<String> uuid = new ThreadLocal<>();
+// /**
+// * String means storage group,List means the set of new files(path) in local IoTDB and String
+// * means path of new Files
+// **/
+// private ThreadLocal<Map<String, List<String>>> fileNodeMap = new ThreadLocal<>();
+// /**
+// * Map String1 means timeseries String2 means path of new Files, long means startTime
+// **/
+// private ThreadLocal<Map<String, Map<String, Long>>> fileNodeStartTime = new ThreadLocal<>();
+// /**
+// * Map String1 means timeseries String2 means path of new Files, long means endTime
+// **/
+// private ThreadLocal<Map<String, Map<String, Long>>> fileNodeEndTime = new ThreadLocal<>();
+//
+// /**
+// * Total num of files that needs to be loaded
+// */
+// private ThreadLocal<Integer> fileNum = new ThreadLocal<>();
+//
+// /**
+// * IoTDB config
+// **/
+// private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+//
+// /**
+// * IoTDB data directory
+// **/
+// private String baseDir = config.getBaseDir();
+//
+// /**
+// * IoTDB multiple bufferWrite directory
+// **/
+// private String[] bufferWritePaths = config.getDataDirs();
+//
+// /**
+// * The path to store metadata file of sender
+// */
+// private ThreadLocal<String> schemaFromSenderPath = new ThreadLocal<>();
+//
+// /**
+// * Sync folder path of server
+// **/
+// private String syncFolderPath;
+//
+// /**
+// * Sync data path of server
+// */
+// private String syncDataPath;
+//
+// /**
+// * Init threadLocal variable and delete old useless files.
+// */
+// @Override
+// public boolean init(String storageGroup) {
+// logger.info("Sync process starts to receive data of storage group {}", storageGroup);
+// fileNum.set(0);
+// fileNodeMap.set(new HashMap<>());
+// fileNodeStartTime.set(new HashMap<>());
+// fileNodeEndTime.set(new HashMap<>());
+// try {
+// FileUtils.deleteDirectory(new File(syncDataPath));
+// } catch (IOException e) {
+// logger.error("cannot delete directory {} ", syncFolderPath);
+// return false;
+// }
+// for (String bufferWritePath : bufferWritePaths) {
+// bufferWritePath = FilePathUtils.regularizePath(bufferWritePath);
+// String backupPath = bufferWritePath + SYNC_SERVER + File.separator;
+// File backupDirectory = new File(backupPath, this.uuid.get());
+// if (backupDirectory.exists() && backupDirectory.list().length != 0) {
+// try {
+// FileUtils.deleteDirectory(backupDirectory);
+// } catch (IOException e) {
+// logger.error("cannot delete directory {} ", syncFolderPath);
+// return false;
+// }
+// }
+// }
+// return true;
+// }
+//
+// /**
+// * Verify IP address of sender
+// */
+// @Override
+// public boolean checkIdentity(String uuid, String ipAddress) {
+// Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
+// this.uuid.set(uuid);
+// initPath();
+// return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress);
+// }
+//
+// /**
+// * Init file path and clear data if last sync process failed.
+// */
+// private void initPath() {
+// baseDir = FilePathUtils.regularizePath(baseDir);
+// syncFolderPath = baseDir + SYNC_SERVER + File.separatorChar + this.uuid.get();
+// syncDataPath = syncFolderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME;
+// schemaFromSenderPath
+// .set(syncFolderPath + File.separator + MetadataConstant.METADATA_LOG);
+// }
+//
+// /**
+// * Acquire schema from sender
+// *
+// * @param status: FINIFSH_STATUS, SUCCESS_STATUS or PROCESSING_STATUS. status = FINISH_STATUS :
+// * finish receiving schema file, start to sync schema. status = PROCESSING_STATUS : the schema
+// * file has not received completely.SUCCESS_STATUS: load metadata.
+// */
+// @Override
+// public String syncSchema(String md5, ByteBuffer schema, SyncDataStatus status) {
+// String md5OfReceiver = Boolean.toString(Boolean.TRUE);
+// if (status == SyncDataStatus.SUCCESS_STATUS) {
+// /** sync metadata, include storage group and timeseries **/
+// return Boolean.toString(loadMetadata());
+// } else if (status == SyncDataStatus.PROCESSING_STATUS) {
+// File file = new File(schemaFromSenderPath.get());
+// if (!file.getParentFile().exists()) {
+// try {
+// file.getParentFile().mkdirs();
+// file.createNewFile();
+// } catch (IOException e) {
+// logger.error("Cannot make schema file {}.", file.getPath(), e);
+// md5OfReceiver = Boolean.toString(Boolean.FALSE);
+// }
+// }
+// try (FileOutputStream fos = new FileOutputStream(file, true);
+// FileChannel channel = fos.getChannel()) {
+// channel.write(schema);
+// } catch (Exception e) {
+// logger.error("Cannot insert data to file {}.", file.getPath(), e);
+// md5OfReceiver = Boolean.toString(Boolean.FALSE);
+// }
+// } else {
+// try (FileInputStream fis = new FileInputStream(schemaFromSenderPath.get())) {
+// MessageDigest md = MessageDigest.getInstance("MD5");
+// byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
+// int n;
+// while ((n = fis.read(buffer)) != -1) {
+// md.update(buffer, 0, n);
+// }
+// md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
+// if (!md5.equals(md5OfReceiver)) {
+// FileUtils.forceDelete(new File(schemaFromSenderPath.get()));
+// }
+// } catch (Exception e) {
+// logger.error("Receiver cannot generate md5 {}", schemaFromSenderPath.get(), e);
+// }
+// }
+// return md5OfReceiver;
+// }
+//
+// /**
+// * Load metadata from sender
+// */
+// private boolean loadMetadata() {
+// if (new File(schemaFromSenderPath.get()).exists()) {
+// try (BufferedReader br = new BufferedReader(
+// new java.io.FileReader(schemaFromSenderPath.get()))) {
+// String metadataOperation;
+// while ((metadataOperation = br.readLine()) != null) {
+// operation(metadataOperation);
+// }
+// } catch (FileNotFoundException e) {
+// logger.error("Cannot read the file {}.",
+// schemaFromSenderPath.get(), e);
+// return false;
+// } catch (IOException e) {
+// /** multiple insert schema, ignore it **/
+// } catch (Exception e) {
+// logger.error("Parse metadata operation failed.", e);
+// return false;
+// }
+// }
+// return true;
+// }
+//
+// /**
+// * Operate metadata operation in MManager
+// *
+// * @param cmd metadata operation
+// */
+// private void operation(String cmd)
+// throws PathErrorException, IOException, MetadataErrorException {
+// String[] args = cmd.trim().split(",");
+// switch (args[0]) {
+// case MetadataOperationType.ADD_PATH_TO_MTREE:
+// Map<String, String> props;
+// String[] kv;
+// props = new HashMap<>(args.length - 5 + 1, 1);
+// for (int k = 5; k < args.length; k++) {
+// kv = args[k].split("=");
+// props.put(kv[0], kv[1]);
+// }
+// metadataManger.addPathToMTree(new Path(args[1]), TSDataType.deserialize(Short.valueOf(args[2])),
+// TSEncoding.deserialize(Short.valueOf(args[3])),
+// CompressionType.deserialize(Short.valueOf(args[4])),
+// props);
+// break;
+// case MetadataOperationType.DELETE_PATH_FROM_MTREE:
+// metadataManger.deletePaths(Collections.singletonList(new Path(args[1])));
+// break;
+// case MetadataOperationType.SET_STORAGE_LEVEL_TO_MTREE:
+// metadataManger.setStorageLevelToMTree(args[1]);
+// break;
+// case MetadataOperationType.ADD_A_PTREE:
+// metadataManger.addAPTree(args[1]);
+// break;
+// case MetadataOperationType.ADD_A_PATH_TO_PTREE:
+// metadataManger.addPathToPTree(args[1]);
+// break;
+// case MetadataOperationType.DELETE_PATH_FROM_PTREE:
+// metadataManger.deletePathFromPTree(args[1]);
+// break;
+// case MetadataOperationType.LINK_MNODE_TO_PTREE:
+// metadataManger.linkMNodeToPTree(args[1], args[2]);
+// break;
+// case MetadataOperationType.UNLINK_MNODE_FROM_PTREE:
+// metadataManger.unlinkMNodeFromPTree(args[1], args[2]);
+// break;
+// default:
+// logger.error("Unrecognizable command {}", cmd);
+// }
+// }
+//
+// /**
+// * Start receiving tsfile from sender
+// *
+// * @param status status = SUCCESS_STATUS : finish receiving one tsfile status = PROCESSING_STATUS
+// * : tsfile has not received completely.
+// */
+// @Override
+// public String syncData(String md5OfSender, List<String> filePathSplit,
+// ByteBuffer dataToReceive, SyncDataStatus status) {
+// String md5OfReceiver = Boolean.toString(Boolean.TRUE);
+// FileChannel channel;
+// /** Recombination File Path **/
+// String filePath = StringUtils.join(filePathSplit, File.separatorChar);
+// syncDataPath = FilePathUtils.regularizePath(syncDataPath);
+// filePath = syncDataPath + filePath;
+// if (status == SyncDataStatus.PROCESSING_STATUS) { // there are still data stream to add
+// File file = new File(filePath);
+// if (!file.getParentFile().exists()) {
+// try {
+// file.getParentFile().mkdirs();
+// file.createNewFile();
+// } catch (IOException e) {
+// logger.error("cannot make file {}", file.getPath(), e);
+// md5OfReceiver = Boolean.toString(Boolean.FALSE);
+// }
+// }
+// try (FileOutputStream fos = new FileOutputStream(file, true)) {// append new data
+// channel = fos.getChannel();
+// channel.write(dataToReceive);
+// } catch (IOException e) {
+// logger.error("cannot insert data to file {}", file.getPath(), e);
+// md5OfReceiver = Boolean.toString(Boolean.FALSE);
+//
+// }
+// } else { // all data in the same file has received successfully
+// try (FileInputStream fis = new FileInputStream(filePath)) {
+// MessageDigest md = MessageDigest.getInstance("MD5");
+// byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
+// int n;
+// while ((n = fis.read(buffer)) != -1) {
+// md.update(buffer, 0, n);
+// }
+// md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
+// if (md5OfSender.equals(md5OfReceiver)) {
+// fileNum.set(fileNum.get() + 1);
+//
+// logger.info(String.format("Receiver has received %d files from sender", fileNum.get()));
+// } else {
+// FileUtils.forceDelete(new File(filePath));
+// }
+// } catch (Exception e) {
+// logger.error("Receiver cannot generate md5 {}", filePath, e);
+// }
+// }
+// return md5OfReceiver;
+// }
+//
+//
+// @Override
+// public boolean load() {
+// try {
+// getFileNodeInfo();
+// loadData();
+// } catch (Exception e) {
+// logger.error("fail to load data", e);
+// return false;
+// }
+// return true;
+// }
+//
+// /**
+// * Get all tsfiles' info which are sent from sender, it is preparing for merging these data
+// */
+// public void getFileNodeInfo() throws IOException {
+// File dataFileRoot = new File(syncDataPath);
+// File[] files = dataFileRoot.listFiles();
+// int processedNum = 0;
+// for (File storageGroupPB : files) {
+// List<String> filesPath = new ArrayList<>();
+// File[] filesSG = storageGroupPB.listFiles();
+// for (File fileTF : filesSG) { // fileTF means TsFiles
+// Map<String, Long> startTimeMap = new HashMap<>();
+// Map<String, Long> endTimeMap = new HashMap<>();
+// TsFileSequenceReader reader = null;
+// try {
+// reader = new TsFileSequenceReader(fileTF.getPath());
+// Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
+// Iterator<String> it = deviceIdMap.keySet().iterator();
+// while (it.hasNext()) {
+// String key = it.next();
+// TsDeviceMetadataIndex device = deviceIdMap.get(key);
+// startTimeMap.put(key, device.getStartTime());
+// endTimeMap.put(key, device.getEndTime());
+// }
+// } catch (IOException e) {
+// logger.error("Unable to read tsfile {}", fileTF.getPath());
+// throw new IOException(e);
+// } finally {
+// try {
+// if (reader != null) {
+// reader.close();
+// }
+// } catch (IOException e) {
+// logger.error("Cannot close tsfile stream {}", fileTF.getPath());
+// throw new IOException(e);
+// }
+// }
+// fileNodeStartTime.get().put(fileTF.getPath(), startTimeMap);
+// fileNodeEndTime.get().put(fileTF.getPath(), endTimeMap);
+// filesPath.add(fileTF.getPath());
+// processedNum++;
+// logger.info(String
+// .format("Get tsfile info has complete : %d/%d", processedNum, fileNum.get()));
+// fileNodeMap.get().put(storageGroupPB.getName(), filesPath);
+// }
+// }
+// }
+//
+//
+// /**
+// * It is to merge data. If data in the tsfile is new, append the tsfile to the storage group
+// * directly. If data in the tsfile is old, it has two strategy to merge.It depends on the
+// * possibility of updating historical data.
+// */
+// public void loadData() throws StorageEngineException {
+// syncDataPath = FilePathUtils.regularizePath(syncDataPath);
+// int processedNum = 0;
+// for (String storageGroup : fileNodeMap.get().keySet()) {
+// List<String> filesPath = fileNodeMap.get().get(storageGroup);
+// /** before load external tsFile, it is necessary to order files in the same storage group **/
+// Collections.sort(filesPath, (o1, o2) -> {
+// Map<String, Long> startTimePath1 = fileNodeStartTime.get().get(o1);
+// Map<String, Long> endTimePath2 = fileNodeEndTime.get().get(o2);
+// for (Entry<String, Long> entry : endTimePath2.entrySet()) {
+// if (startTimePath1.containsKey(entry.getKey())) {
+// if (startTimePath1.get(entry.getKey()) > entry.getValue()) {
+// return 1;
+// } else {
+// return -1;
+// }
+// }
+// }
+// return 0;
+// });
+//
+// for (String path : filesPath) {
+// // get startTimeMap and endTimeMap
+// Map<String, Long> startTimeMap = fileNodeStartTime.get().get(path);
+// Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path);
+//
+// // create a new fileNode
+// String header = syncDataPath;
+// String relativePath = path.substring(header.length());
+// TsFileResource fileNode = new TsFileResource(
+// new File(DirectoryManager.getInstance().getNextFolderIndexForSequenceFile() +
+// File.separator + relativePath), startTimeMap, endTimeMap
+// );
+// // call interface of load external file
+// try {
+// if (!STORAGE_GROUP_MANAGER.appendFileToStorageGroupProcessor(storageGroup, fileNode, path)) {
+// // it is a file with unsequence data
+// if (config.isUpdateHistoricalDataPossibility()) {
+// loadOldData(path);
+// } else {
+// List<String> overlapFiles = STORAGE_GROUP_MANAGER.getOverlapFiles(
+// storageGroup,
+// fileNode, uuid.get());
+// if (overlapFiles.isEmpty()) {
+// loadOldData(path);
+// } else {
+// loadOldData(path, overlapFiles);
+// }
+// }
+// }
+// } catch (StorageEngineException | IOException | ProcessorException e) {
+// logger.error("Can not load external file {}", path);
+// throw new StorageEngineException(e);
+// }
+// processedNum++;
+// logger.info(String
+// .format("Merging files has completed : %d/%d", processedNum, fileNum.get()));
+// }
+// }
+// }
+//
+// /**
+// * Insert all data in the tsfile into IoTDB.
+// */
+// public void loadOldData(String filePath) throws IOException, ProcessorException {
+// Set<String> timeseriesSet = new HashSet<>();
+// TsFileSequenceReader reader = null;
+// QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
+// try {
+// /** use tsfile reader to get data **/
+// reader = new TsFileSequenceReader(filePath);
+// Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
+// Iterator<Entry<String, TsDeviceMetadataIndex>> entryIterator = deviceIdMap.entrySet()
+// .iterator();
+// while (entryIterator.hasNext()) {
+// Entry<String, TsDeviceMetadataIndex> deviceMIEntry = entryIterator.next();
+// String deviceId = deviceMIEntry.getKey();
+// TsDeviceMetadataIndex deviceMI = deviceMIEntry.getValue();
+// TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
+// List<ChunkGroupMetaData> rowGroupMetadataList = deviceMetadata.getChunkGroupMetaDataList();
+// timeseriesSet.clear();
+// /** firstly, get all timeseries in the same device **/
+// for (ChunkGroupMetaData chunkGroupMetaData : rowGroupMetadataList) {
+// List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData
+// .getChunkMetaDataList();
+// for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
+// String measurementUID = chunkMetaData.getMeasurementUid();
+// measurementUID = deviceId + "." + measurementUID;
+// timeseriesSet.add(measurementUID);
+// }
+// }
+// /** Secondly, use tsFile Reader to form InsertPlan **/
+// ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
+// List<Path> paths = new ArrayList<>();
+// paths.clear();
+// for (String timeseries : timeseriesSet) {
+// paths.add(new Path(timeseries));
+// }
+// QueryExpression queryExpression = QueryExpression.create(paths, null);
+// QueryDataSet queryDataSet = readTsFile.query(queryExpression);
+// while (queryDataSet.hasNext()) {
+// RowRecord record = queryDataSet.next();
+// List<Field> fields = record.getFields();
+// List<String> measurementList = new ArrayList<>();
+// List<String> insertValues = new ArrayList<>();
+// for (int i = 0; i < fields.size(); i++) {
+// Field field = fields.get(i);
+// if (!field.isNull()) {
+// measurementList.add(paths.get(i).getMeasurement());
+// if (fields.get(i).getDataType() == TSDataType.TEXT) {
+// insertValues.add(String.format("'%s'", field.toString()));
+// } else {
+// insertValues.add(String.format("%s", field.toString()));
+// }
+// }
+// }
+// if (insertExecutor.insert(new InsertPlan(deviceId, record.getTimestamp(),
+// measurementList.toArray(new String[0]), insertValues.toArray(new String[0])))) {
+// throw new IOException("Inserting series data to IoTDB engine has failed.");
+// }
+// }
+// }
+// } catch (IOException e) {
+// logger.error("Can not parse tsfile into SQL", e);
+// throw new IOException(e);
+// } catch (ProcessorException e) {
+// logger.error("Meet error while processing non-query.");
+// throw new ProcessorException(e);
+// } finally {
+// try {
+// if (reader != null) {
+// reader.close();
+// }
+// } catch (IOException e) {
+// logger.error("Cannot close file stream {}", filePath, e);
+// }
+// }
+// }
+//
+// /**
+// * Insert those valid data in the tsfile into IoTDB
+// *
+// * @param overlapFiles:files which are conflict with the sync file
+// */
+// public void loadOldData(String filePath, List<String> overlapFiles)
+// throws IOException, ProcessorException {
+// Set<String> timeseriesList = new HashSet<>();
+// QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
+// Map<String, ReadOnlyTsFile> tsfilesReaders = openReaders(filePath, overlapFiles);
+// try {
+// TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
+// Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
+// Iterator<String> it = deviceIdMap.keySet().iterator();
+// while (it.hasNext()) {
+// String deviceID = it.next();
+// TsDeviceMetadataIndex deviceMI = deviceIdMap.get(deviceID);
+// TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
+// List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
+// .getChunkGroupMetaDataList();
+// timeseriesList.clear();
+// /** firstly, get all timeseries in the same device **/
+// for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
+// List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList();
+// for (ChunkMetaData timeSeriesChunkMetaData : chunkMetaDataList) {
+// String measurementUID = timeSeriesChunkMetaData.getMeasurementUid();
+// measurementUID = deviceID + "." + measurementUID;
+// timeseriesList.add(measurementUID);
+// }
+// }
+// reader.close();
+//
+// /** secondly, use tsFile Reader to form SQL **/
+// ReadOnlyTsFile readOnlyTsFile = tsfilesReaders.get(filePath);
+// List<Path> paths = new ArrayList<>();
+// /** compare data with one timeseries in a round to get valid data **/
+// for (String timeseries : timeseriesList) {
+// paths.clear();
+// paths.add(new Path(timeseries));
+// Set<InsertPlan> originDataPoints = new HashSet<>();
+// QueryExpression queryExpression = QueryExpression.create(paths, null);
+// QueryDataSet queryDataSet = readOnlyTsFile.query(queryExpression);
+// Set<InsertPlan> newDataPoints = convertToInserPlans(queryDataSet, paths, deviceID);
+//
+// /** get all data with the timeseries in all overlap files. **/
+// for (String overlapFile : overlapFiles) {
+// ReadOnlyTsFile readTsFileOverlap = tsfilesReaders.get(overlapFile);
+// QueryDataSet queryDataSetOverlap = readTsFileOverlap.query(queryExpression);
+// originDataPoints.addAll(convertToInserPlans(queryDataSetOverlap, paths, deviceID));
+// }
+//
+// /** If there has no overlap data with the timeseries, inserting all data in the sync file **/
+// if (originDataPoints.isEmpty()) {
+// for (InsertPlan insertPlan : newDataPoints) {
+// if (insertExecutor.insert(insertPlan)) {
+// throw new IOException("Inserting series data to IoTDB engine has failed.");
+// }
+// }
+// } else {
+// /** Compare every data to get valid data **/
+// for (InsertPlan insertPlan : newDataPoints) {
+// if (!originDataPoints.contains(insertPlan)) {
+// if (insertExecutor.insert(insertPlan)) {
+// throw new IOException("Inserting series data to IoTDB engine has failed.");
+// }
+// }
+// }
+// }
+// }
+// }
+// } catch (IOException e) {
+// logger.error("Can not parse tsfile into SQL", e);
+// throw new IOException(e);
+// } catch (ProcessorException e) {
+// logger.error("Meet error while processing non-query.", e);
+// throw new ProcessorException(e);
+// } finally {
+// try {
+// closeReaders(tsfilesReaders);
+// } catch (IOException e) {
+// logger.error("Cannot close file stream {}", filePath, e);
+// }
+// }
+// }
+//
+// private Set<InsertPlan> convertToInserPlans(QueryDataSet queryDataSet, List<Path> paths, String deviceID) throws IOException {
+// Set<InsertPlan> plans = new HashSet<>();
+// while (queryDataSet.hasNext()) {
+// RowRecord record = queryDataSet.next();
+// List<Field> fields = record.getFields();
+// /** get all data with the timeseries in the sync file **/
+// for (int i = 0; i < fields.size(); i++) {
+// Field field = fields.get(i);
+// String[] measurementList = new String[1];
+// if (!field.isNull()) {
+// measurementList[0] = paths.get(i).getMeasurement();
+// InsertPlan insertPlan = new InsertPlan(deviceID, record.getTimestamp(),
+// measurementList, new String[]{field.getDataType() == TSDataType.TEXT ? String.format("'%s'", field.toString())
+// : field.toString()});
+// plans.add(insertPlan);
+// }
+// }
+// }
+// return plans;
+// }
+//
+// /**
+// * Open all tsfile reader and cache
+// */
+// private Map<String, ReadOnlyTsFile> openReaders(String filePath, List<String> overlapFiles)
+// throws IOException {
+// Map<String, ReadOnlyTsFile> tsfileReaders = new HashMap<>();
+// tsfileReaders.put(filePath, new ReadOnlyTsFile(new TsFileSequenceReader(filePath)));
+// for (String overlapFile : overlapFiles) {
+// tsfileReaders.put(overlapFile, new ReadOnlyTsFile(new TsFileSequenceReader(overlapFile)));
+// }
+// return tsfileReaders;
+// }
+//
+// /**
+// * Close all tsfile reader
+// */
+// private void closeReaders(Map<String, ReadOnlyTsFile> readers) throws IOException {
+// for (ReadOnlyTsFile tsfileReader : readers.values()) {
+// tsfileReader.close();
+// }
+// }
+//
+// /**
+// * Release threadLocal variable resources
+// */
+// @Override
+// public void cleanUp() {
+// uuid.remove();
+// fileNum.remove();
+// fileNodeMap.remove();
+// fileNodeStartTime.remove();
+// fileNodeEndTime.remove();
+// schemaFromSenderPath.remove();
+// try {
+// FileUtils.deleteDirectory(new File(syncFolderPath));
+// } catch (IOException e) {
+// logger.error("can not delete directory {}", syncFolderPath, e);
+// }
+// logger.info("Synchronization has finished!");
+// }
+//
+// public Map<String, List<String>> getFileNodeMap() {
+// return fileNodeMap.get();
+// }
+//
+// public void setFileNodeMap(Map<String, List<String>> fileNodeMap) {
+// this.fileNodeMap.set(fileNodeMap);
+// }
+//
+//}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncFileManager.java
deleted file mode 100644
index 18c3b60..0000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncFileManager.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.sender;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.sync.sender.conf.Constans;
-import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
-import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * SyncFileManager is used to pick up those tsfiles need to sync.
- */
-public class SyncFileManager {
-
- private static final Logger logger = LoggerFactory.getLogger(SyncFileManager.class);
-
- /**
- * Files that need to be synchronized
- **/
- private Map<String, Set<String>> validAllFiles = new HashMap<>();
-
- /**
- * All tsfiles in last synchronization process
- **/
- private Set<String> lastLocalFiles = new HashSet<>();
-
- /**
- * All tsfiles in data directory
- **/
- private Map<String, Set<String>> currentLocalFiles = new HashMap<>();
-
- private SyncSenderConfig syncConfig = SyncSenderDescriptor.getInstance().getConfig();
-
- private IoTDBConfig systemConfig = IoTDBDescriptor.getInstance().getConfig();
-
- private static final String RESTORE_SUFFIX = ".restore";
-
- private SyncFileManager() {
- }
-
- public static final SyncFileManager getInstance() {
- return FileManagerHolder.INSTANCE;
- }
-
- /**
- * Initialize SyncFileManager.
- */
- public void init() throws IOException {
- validAllFiles.clear();
- lastLocalFiles.clear();
- currentLocalFiles.clear();
- getLastLocalFileList(syncConfig.getLastFileInfo());
- getCurrentLocalFileList(systemConfig.getDataDirs());
- getValidFileList();
- }
-
- /**
- * get files that needs to be synchronized
- */
- public void getValidFileList() {
- for (Entry<String, Set<String>> entry : currentLocalFiles.entrySet()) {
- for (String path : entry.getValue()) {
- if (!lastLocalFiles.contains(path)) {
- validAllFiles.get(entry.getKey()).add(path);
- }
- }
- }
- logger.info("Acquire list of valid files.");
- for (Entry<String, Set<String>> entry : validAllFiles.entrySet()) {
- for (String path : entry.getValue()) {
- currentLocalFiles.get(entry.getKey()).remove(path);
- }
- }
- }
-
- /**
- * get last local file list.
- *
- * @param path path
- */
- public void getLastLocalFileList(String path) throws IOException {
- Set<String> fileList = new HashSet<>();
- File file = new File(path);
- if (!file.exists()) {
- try {
- file.createNewFile();
- } catch (IOException e) {
- throw new IOException("Cannot get last local file list", e);
- }
- } else {
- try (BufferedReader bf = new BufferedReader(new FileReader(file))) {
- String fileName;
- while ((fileName = bf.readLine()) != null) {
- fileList.add(fileName);
- }
- } catch (IOException e) {
- logger.error("Cannot get last local file list when reading file {}.",
- syncConfig.getLastFileInfo());
- throw new IOException(e);
- }
- }
- lastLocalFiles = fileList;
- }
-
- /**
- * get current local file list.
- *
- * @param paths paths in String[] structure
- */
- public void getCurrentLocalFileList(String[] paths) {
- for (String path : paths) {
- if (!new File(path).exists()) {
- continue;
- }
- File[] listFiles = new File(path).listFiles();
- for (File storageGroup : listFiles) {
- if (!storageGroup.isDirectory() || storageGroup.getName().equals(Constans.SYNC_CLIENT)) {
- continue;
- }
- getStorageGroupFiles(storageGroup);
- }
- }
- }
-
- private void getStorageGroupFiles(File storageGroup) {
- if (!currentLocalFiles.containsKey(storageGroup.getName())) {
- currentLocalFiles.put(storageGroup.getName(), new HashSet<>());
- }
- if (!validAllFiles.containsKey(storageGroup.getName())) {
- validAllFiles.put(storageGroup.getName(), new HashSet<>());
- }
- File[] files = storageGroup.listFiles();
- for (File file : files) {
- if (!file.getPath().endsWith(RESTORE_SUFFIX) && !new File(
- file.getPath() + RESTORE_SUFFIX).exists()) {
- currentLocalFiles.get(storageGroup.getName()).add(file.getPath());
- }
- }
- }
-
- /**
- * backup current local file information.
- *
- * @param backupFile backup file path
- */
- public void backupNowLocalFileInfo(String backupFile) {
- try (BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(backupFile))) {
- for (Entry<String, Set<String>> entry : currentLocalFiles.entrySet()) {
- for (String file : entry.getValue()) {
- bufferedWriter.write(file + "\n");
- }
- }
- } catch (IOException e) {
- logger.error("Cannot back up current local file info", e);
- }
- }
-
- public Map<String, Set<String>> getValidAllFiles() {
- return validAllFiles;
- }
-
- public Set<String> getLastLocalFiles() {
- return lastLocalFiles;
- }
-
- public Map<String, Set<String>> getCurrentLocalFiles() {
- return currentLocalFiles;
- }
-
- public void setCurrentLocalFiles(Map<String, Set<String>> newNowLocalFiles) {
- currentLocalFiles = newNowLocalFiles;
- }
-
- private static class FileManagerHolder {
-
- private static final SyncFileManager INSTANCE = new SyncFileManager();
- }
-}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index c8e2ce2..05b67c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -24,15 +24,18 @@ public class Constans {
}
public static final String CONFIG_NAME = "iotdb-sync-client.properties";
- public static final String SYNC_CLIENT = "sync-client";
- public static final String SYNC_SERVER = "sync-server";
+ public static final String SYNC_SENDER = "sync-sender";
+ public static final String SYNC_RECEIVER = "sync-receiver";
- public static final String LOCK_FILE_NAME = "sync-lock";
- public static final String UUID_FILE_NAME = "uuid.txt";
+ public static final String LOCK_FILE_NAME = "sync_lock";
+ public static final String SCHEMA_POS_FILE_NAME = "sync_schema_pos";
public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt";
- public static final String DATA_SNAPSHOT_NAME = "data-snapshot";
+ public static final String DATA_SNAPSHOT_NAME = "snapshot";
+ public static final String SYNC_LOG_NAME = "sync.log";
+ public static final String CURRENT_SYNC_LOG_NAME = "current_sync.log";
- public static final String BACK_UP_DIRECTORY_NAME = "backup";
+ public static final String MESSAGE_DIGIT_NAME = "MD5";
+ public static final String SYNC_DIR_NAME_SEPARATOR = "_";
/**
* Split data file , block size at each transmission
@@ -42,7 +45,7 @@ public class Constans {
/**
* Max try when syncing the same file to receiver fails.
*/
- public static final int MAX_SYNC_FILE_TRY = 10;
+ public static final int MAX_SYNC_FILE_TRY = 5;
private static final SyncSenderConfig CONFIG = SyncSenderDescriptor.getInstance().getConfig();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
index 572e5df..a076257 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
@@ -19,104 +19,35 @@
package org.apache.iotdb.db.sync.sender.conf;
import java.io.File;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.metadata.MetadataConstant;
-import org.apache.iotdb.db.utils.FilePathUtils;
public class SyncSenderConfig {
- private String[] seqFileDirectory = IoTDBDescriptor.getInstance().getConfig()
- .getDataDirs();
-
- private String dataDirectory = IoTDBDescriptor.getInstance().getConfig().getBaseDir();
-
- private String lockFilePath;
-
- private String uuidPath;
-
- private String lastFileInfo;
-
- private String[] snapshotPaths;
-
- private String schemaPath;
-
private String serverIp = "127.0.0.1";
private int serverPort = 5555;
private int syncPeriodInSecond = 10;
- /**
- * Init path
- */
- public void init() {
- schemaPath = IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + MetadataConstant.METADATA_LOG;
- if (dataDirectory.length() > 0
- && dataDirectory.charAt(dataDirectory.length() - 1) != File.separatorChar) {
- dataDirectory += File.separatorChar;
- }
- lockFilePath =
- dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.LOCK_FILE_NAME;
- uuidPath = dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.UUID_FILE_NAME;
- lastFileInfo =
- dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME;
- snapshotPaths = new String[seqFileDirectory.length];
- for (int i = 0; i < seqFileDirectory.length; i++) {
- seqFileDirectory[i] = new File(seqFileDirectory[i]).getAbsolutePath();
- seqFileDirectory[i] = FilePathUtils.regularizePath(seqFileDirectory[i]);
- snapshotPaths[i] = seqFileDirectory[i] + Constans.SYNC_CLIENT + File.separatorChar
- + Constans.DATA_SNAPSHOT_NAME
- + File.separatorChar;
- }
-
- }
-
- public String[] getSeqFileDirectory() {
- return seqFileDirectory;
- }
-
- public void setSeqFileDirectory(String[] seqFileDirectory) {
- this.seqFileDirectory = seqFileDirectory;
- }
-
- public String getDataDirectory() {
- return dataDirectory;
- }
-
- public void setDataDirectory(String dataDirectory) {
- this.dataDirectory = dataDirectory;
- }
-
- public String getUuidPath() {
- return uuidPath;
- }
-
- public void setUuidPath(String uuidPath) {
- this.uuidPath = uuidPath;
- }
-
- public String getLastFileInfo() {
- return lastFileInfo;
- }
+ private String senderPath;
- public void setLastFileInfo(String lastFileInfo) {
- this.lastFileInfo = lastFileInfo;
- }
+ private String lockFilePath;
- public String[] getSnapshotPaths() {
- return snapshotPaths;
- }
+ private String lastFileInfo;
- public void setSnapshotPaths(String[] snapshotPaths) {
- this.snapshotPaths = snapshotPaths;
- }
+ private String snapshotPath;
- public String getSchemaPath() {
- return schemaPath;
- }
-
- public void setSchemaPath(String schemaPath) {
- this.schemaPath = schemaPath;
+ /**
+ * Update paths based on data directory
+ */
+ public void update(String dataDirectory) {
+ senderPath = dataDirectory + File.separatorChar + Constans.SYNC_SENDER + File.separatorChar +
+ getSyncReceiverName();
+ lockFilePath = senderPath + File.separatorChar + Constans.LOCK_FILE_NAME;
+ lastFileInfo = senderPath + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME;
+ snapshotPath = senderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME;
+ if(!new File(snapshotPath).exists()){
+ new File(snapshotPath).mkdirs();
+ }
}
public String getServerIp() {
@@ -143,6 +74,14 @@ public class SyncSenderConfig {
this.syncPeriodInSecond = syncPeriodInSecond;
}
+ public String getSenderPath() {
+ return senderPath;
+ }
+
+ public void setSenderPath(String senderPath) {
+ this.senderPath = senderPath;
+ }
+
public String getLockFilePath() {
return lockFilePath;
}
@@ -150,4 +89,24 @@ public class SyncSenderConfig {
public void setLockFilePath(String lockFilePath) {
this.lockFilePath = lockFilePath;
}
+
+ public String getLastFileInfo() {
+ return lastFileInfo;
+ }
+
+ public void setLastFileInfo(String lastFileInfo) {
+ this.lastFileInfo = lastFileInfo;
+ }
+
+ public String getSnapshotPath() {
+ return snapshotPath;
+ }
+
+ public void setSnapshotPath(String snapshotPath) {
+ this.snapshotPath = snapshotPath;
+ }
+
+ public String getSyncReceiverName() {
+ return serverIp + Constans.SYNC_DIR_NAME_SEPARATOR + serverPort;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java
index 2427c10..df1b834 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.utils.FilePathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +37,8 @@ public class SyncSenderDescriptor {
loadProps();
}
- public static final SyncSenderDescriptor getInstance() {
- return PostBackDescriptorHolder.INSTANCE;
+ public static SyncSenderDescriptor getInstance() {
+ return SyncSenderDescriptorHolder.INSTANCE;
}
public SyncSenderConfig getConfig() {
@@ -54,7 +53,6 @@ public class SyncSenderDescriptor {
* load an properties file and set sync config variables
*/
private void loadProps() {
- conf.init();
InputStream inputStream;
String url = System.getProperty(IoTDBConstant.IOTDB_CONF, null);
if (url == null) {
@@ -90,42 +88,20 @@ public class SyncSenderDescriptor {
conf.setSyncPeriodInSecond(Integer.parseInt(properties
.getProperty("sync_period_in_second",
Integer.toString(conf.getSyncPeriodInSecond()))));
- conf.setSchemaPath(properties.getProperty("iotdb_schema_directory", conf.getSchemaPath()));
- conf.setDataDirectory(
- properties.getProperty("iotdb_bufferWrite_directory", conf.getDataDirectory()));
- String dataDirectory = conf.getDataDirectory();
- if (dataDirectory.length() > 0
- && dataDirectory.charAt(dataDirectory.length() - 1) != File.separatorChar) {
- dataDirectory += File.separatorChar;
- }
- conf.setUuidPath(
- dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.UUID_FILE_NAME);
- conf.setLastFileInfo(
- dataDirectory + Constans.SYNC_CLIENT + File.separatorChar
- + Constans.LAST_LOCAL_FILE_NAME);
- String[] sequenceFileDirectory = conf.getSeqFileDirectory();
- String[] snapshots = new String[conf.getSeqFileDirectory().length];
- for (int i = 0; i < conf.getSeqFileDirectory().length; i++) {
- sequenceFileDirectory[i] = FilePathUtils.regularizePath(sequenceFileDirectory[i]);
- snapshots[i] = sequenceFileDirectory[i] + Constans.SYNC_CLIENT + File.separatorChar
- + Constans.DATA_SNAPSHOT_NAME + File.separatorChar;
- }
- conf.setSeqFileDirectory(sequenceFileDirectory);
- conf.setSnapshotPaths(snapshots);
} catch (IOException e) {
- logger.warn("Cannot load config file because {}, use default configuration", e);
+ logger.warn("Cannot load config file, use default configuration.", e);
} catch (Exception e) {
- logger.warn("Error format in config file because {}, use default configuration", e);
+ logger.warn("Error format in config file, use default configuration.", e);
} finally {
try {
inputStream.close();
} catch (IOException e) {
- logger.error("Fail to close sync config file input stream because ", e);
+ logger.error("Fail to close sync config file input stream.", e);
}
}
}
- private static class PostBackDescriptorHolder {
+ private static class SyncSenderDescriptorHolder {
private static final SyncSenderDescriptor INSTANCE = new SyncSenderDescriptor();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/IFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
similarity index 69%
rename from server/src/main/java/org/apache/iotdb/db/sync/sender/manage/IFileManager.java
rename to server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
index b1ba08f..d4f7e33 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/IFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
@@ -18,8 +18,18 @@
*/
package org.apache.iotdb.db.sync.sender.manage;
-public interface IFileManager {
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+public interface ISyncFileManager {
+ void getCurrentLocalFiles(String dataDir);
+ void getLastLocalFiles(File lastLocalFile) throws IOException;
+
+ void getValidFiles(String dataDir) throws IOException;
+
+ void updateLastLocalFiles(File lastLocalFile, Set<String> localFiles);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
new file mode 100644
index 0000000..20f1dca
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.manage;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncFileManager implements ISyncFileManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SyncFileManager.class);
+
+ private Map<String, Set<File>> currentSealedLocalFilesMap;
+
+ private Map<String, Set<File>> lastLocalFilesMap;
+
+ private Map<String, Set<File>> deletedFilesMap;
+
+ private Map<String, Set<File>> toBeSyncedFilesMap;
+
+ private SyncFileManager() {
+
+ }
+
+ public static final SyncFileManager getInstance() {
+ return SyncFileManagerHolder.INSTANCE;
+ }
+
+ @Override
+ public void getCurrentLocalFiles(String dataDir) {
+ LOGGER.info("Start to get current local files in data folder {}", dataDir);
+ // get all files in data dir sequence folder
+ Map<String, Set<File>> currentAllLocalFiles = new HashMap<>();
+ File[] allSGFolders = new File(
+ dataDir + File.separatorChar + IoTDBConstant.SEQUENCE_FLODER_NAME)
+ .listFiles();
+ for (File sgFolder : allSGFolders) {
+ currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashSet<>());
+ Arrays.stream(sgFolder.listFiles()).forEach(file -> currentAllLocalFiles.get(sgFolder.getName())
+ .add(new File(sgFolder.getAbsolutePath(), file.getName())));
+ }
+
+ // get sealed tsfiles
+ currentSealedLocalFilesMap = new HashMap<>();
+ for (Entry<String, Set<File>> entry : currentAllLocalFiles.entrySet()) {
+ String sgName = entry.getKey();
+ currentSealedLocalFilesMap.putIfAbsent(sgName, new HashSet<>());
+ for (File file : entry.getValue()) {
+ if (file.getName().endsWith(ModificationFile.FILE_SUFFIX) || file.getName()
+ .endsWith(TsFileResource.RESOURCE_SUFFIX)) {
+ continue;
+ }
+ if (new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists() && !new File(
+ file.getAbsolutePath() + ModificationFile.FILE_SUFFIX).exists()) {
+ currentSealedLocalFilesMap.get(sgName).add(file);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void getLastLocalFiles(File lastLocalFileInfo) throws IOException {
+ LOGGER.info("Start to get last local files from last local file info {}",
+ lastLocalFileInfo.getAbsoluteFile());
+ lastLocalFilesMap = new HashMap<>();
+ try (BufferedReader reader = new BufferedReader(new FileReader(lastLocalFileInfo))) {
+ String fileName;
+ while ((fileName = reader.readLine()) != null) {
+ String sgName = new File(fileName).getParent();
+ lastLocalFilesMap.putIfAbsent(sgName, new HashSet<>());
+ lastLocalFilesMap.get(sgName).add(new File(fileName));
+ }
+ }
+ }
+
+ @Override
+ public void getValidFiles(String dataDir) throws IOException {
+ getCurrentLocalFiles(dataDir);
+ getLastLocalFiles(new File(SyncSenderDescriptor.getInstance().getConfig().getLastFileInfo()));
+ toBeSyncedFilesMap = new HashMap<>();
+ deletedFilesMap = new HashMap<>();
+ for(Entry<String, Set<File>> entry: currentSealedLocalFilesMap.entrySet()){
+ String sgName = entry.getKey();
+ toBeSyncedFilesMap.putIfAbsent(sgName, new HashSet<>());
+ deletedFilesMap.putIfAbsent(sgName, new HashSet<>());
+ for(File newFile:currentSealedLocalFilesMap.get(sgName)){
+ if(!lastLocalFilesMap.get(sgName).contains(newFile)){
+ toBeSyncedFilesMap.get(sgName).add(newFile);
+ }
+ }
+ for(File oldFile:lastLocalFilesMap.get(sgName)){
+ if(!currentSealedLocalFilesMap.get(sgName).contains(oldFile)){
+ deletedFilesMap.get(sgName).add(oldFile);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void updateLastLocalFiles(File lastLocalFile, Set<String> localFiles) {
+
+ }
+
+ public Map<String, Set<File>> getCurrentSealedLocalFilesMap() {
+ return currentSealedLocalFilesMap;
+ }
+
+ public Map<String, Set<File>> getLastLocalFilesMap() {
+ return lastLocalFilesMap;
+ }
+
+ public Map<String, Set<File>> getDeletedFilesMap() {
+ return deletedFilesMap;
+ }
+
+ public Map<String, Set<File>> getToBeSyncedFilesMap() {
+ return toBeSyncedFilesMap;
+ }
+
+ private static class SyncFileManagerHolder {
+
+ private static final SyncFileManager INSTANCE = new SyncFileManager();
+
+ private SyncFileManagerHolder() {
+
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
index c516936..15df693 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
@@ -18,18 +18,27 @@
*/
package org.apache.iotdb.db.sync.sender.recover;
+import java.io.File;
+import java.io.IOException;
+
public interface ISyncSenderLogger {
- void startSyncDeletedFilesName();
+ void startSync() throws IOException;
+
+ void endSync() throws IOException;
+
+ void startSyncDeletedFilesName() throws IOException;
+
+ void finishSyncDeletedFileName(File file) throws IOException;
- void finishSyncDeletedFileName(String fileName);
+ void endSyncDeletedFilsName() throws IOException;
- void endSyncDeletedFilsName();
+ void startSyncTsFiles() throws IOException;
- void startSyncTsFiles();
+ void finishSyncTsfile(File file) throws IOException;
- void finishSyncTsfile(String fileName);
+ void endSyncTsFiles() throws IOException;
- void endSyncTsFiles();
+ void close() throws IOException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
new file mode 100644
index 0000000..8171d0f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
@@ -0,0 +1,86 @@
+package org.apache.iotdb.db.sync.sender.recover;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class SyncSenderLogger implements ISyncSenderLogger {
+
+ public static final String SYNC_START = "sync start";
+ public static final String SYNC_END = "sync end";
+ public static final String SYNC_DELETED_FILE_NAME_START = "sync deleted file names start";
+ public static final String SYNC_DELETED_FILE_NAME_END = "sync deleted file names end";
+ public static final String SYNC_TSFILE_START = "sync tsfile start";
+ public static final String SYNC_TSFILE_END = "sync tsfile end";
+ private BufferedWriter bw;
+
+ public SyncSenderLogger(String filePath) throws IOException {
+ this.bw = new BufferedWriter(new FileWriter(filePath));
+ }
+
+ public SyncSenderLogger(File file) throws IOException {
+ this(file.getAbsolutePath());
+ }
+
+ @Override
+ public void startSync() throws IOException {
+ bw.write(SYNC_START);
+ bw.newLine();
+ bw.flush();
+ }
+
+ @Override
+ public void endSync() throws IOException {
+ bw.write(SYNC_END);
+ bw.newLine();
+ bw.flush();
+ }
+
+ @Override
+ public void startSyncDeletedFilesName() throws IOException {
+ bw.write(SYNC_DELETED_FILE_NAME_START);
+ bw.newLine();
+ bw.flush();
+ }
+
+ @Override
+ public void finishSyncDeletedFileName(File file) throws IOException {
+ bw.write(file.getAbsolutePath());
+ bw.newLine();
+ bw.flush();
+ }
+
+ @Override
+ public void endSyncDeletedFilsName() throws IOException {
+ bw.write(SYNC_DELETED_FILE_NAME_END);
+ bw.newLine();
+ bw.flush();
+ }
+
+ @Override
+ public void startSyncTsFiles() throws IOException {
+ bw.write(SYNC_TSFILE_START);
+ bw.newLine();
+ bw.flush();
+ }
+
+ @Override
+ public void finishSyncTsfile(File file) throws IOException {
+ bw.write(file.getAbsolutePath());
+ bw.newLine();
+ bw.flush();
+ }
+
+ @Override
+ public void endSyncTsFiles() throws IOException {
+ bw.write(SYNC_TSFILE_END);
+ bw.newLine();
+ bw.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ bw.close();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index 59dc27e..c616376 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -1,28 +1,27 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License. You may obtain
+ * a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.iotdb.db.sync.sender.transfer;
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.math.BigInteger;
@@ -33,27 +32,30 @@ import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
-import java.util.ArrayList;
+import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.SyncConnectionException;
-import org.apache.iotdb.db.sync.sender.SyncFileManager;
+import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.sync.sender.conf.Constans;
import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.sender.manage.SyncFileManager;
+import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.ResultStatus;
import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
import org.apache.iotdb.service.sync.thrift.SyncService;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -70,29 +72,36 @@ public class DataTransferManager implements IDataTransferManager {
private static final Logger logger = LoggerFactory.getLogger(DataTransferManager.class);
- private TTransport transport;
+ private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
- private SyncService.Client serviceClient;
+ private static final int BATCH_LINE = 1000;
- private List<String> schema = new ArrayList<>();
+ private int schemaFileLinePos;
- private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
+ private TTransport transport;
+
+ private SyncService.Client serviceClient;
/**
* Files that need to be synchronized
*/
- private Map<String, Set<String>> validAllFiles;
+ private Map<String, Set<File>> toBeSyncedFilesMap;
- /**
- * All tsfiles in data directory
- **/
- private Map<String, Set<String>> currentLocalFiles;
+ private Map<String, Set<File>> deletedFilesMap;
+
+ private Map<String, Set<File>> sucessSyncedFilesMap;
+
+ private Map<String, Set<File>> successDeleyedFilesMap;
+
+ private Map<String, Set<File>> lastLocalFilesMap;
/**
* If true, sync is in execution.
**/
private volatile boolean syncStatus = false;
+ private SyncSenderLogger syncLog;
+
/**
* Key means storage group, Set means corresponding tsfiles
**/
@@ -106,14 +115,12 @@ public class DataTransferManager implements IDataTransferManager {
init();
}
- public static final DataTransferManager getInstance() {
+ public static DataTransferManager getInstance() {
return InstanceHolder.INSTANCE;
}
/**
* Create a sender and sync files to the receiver.
- *
- * @param args not used
*/
public static void main(String[] args) throws IOException {
Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
@@ -137,7 +144,7 @@ public class DataTransferManager implements IDataTransferManager {
private void startMonitor() {
executorService.scheduleWithFixedDelay(() -> {
if (syncStatus) {
- logger.info("Sync process is in execution!");
+ logger.info("Sync process for receiver {} is in execution!", config.getSyncReceiverName());
}
}, Constans.SYNC_MONITOR_DELAY, Constans.SYNC_MONITOR_PERIOD, TimeUnit.SECONDS);
}
@@ -148,8 +155,8 @@ public class DataTransferManager implements IDataTransferManager {
private void startTimedTask() {
executorService.scheduleWithFixedDelay(() -> {
try {
- sync();
- } catch (SyncConnectionException | IOException e) {
+ syncAll();
+ } catch (SyncConnectionException | IOException | TException e) {
logger.error("Sync failed", e);
stop();
}
@@ -162,101 +169,203 @@ public class DataTransferManager implements IDataTransferManager {
executorService = null;
}
- /**
- * Execute a sync task.
- */
- @Override
- public void sync() throws SyncConnectionException, IOException {
+ public void syncAll() throws SyncConnectionException, IOException, TException {
- //1. Clear old snapshots if necessary
- for (String snapshotPath : config.getSnapshotPaths()) {
- if (new File(snapshotPath).exists() && new File(snapshotPath).list().length != 0) {
- // It means that the last task of sync does not succeed! Clear the files and start to sync again
- FileUtils.deleteDirectory(new File(snapshotPath));
- }
- }
-
- // 2. Acquire valid files and check
- syncFileManager.init();
- validAllFiles = syncFileManager.getValidAllFiles();
- currentLocalFiles = syncFileManager.getCurrentLocalFiles();
- if (SyncUtils.isEmpty(validAllFiles)) {
- logger.info("There has no file to sync !");
- return;
- }
-
- // 3. Connect to sync server and Confirm Identity
+ // 1. Connect to sync receiver and confirm identity
establishConnection(config.getServerIp(), config.getServerPort());
- if (!confirmIdentity(config.getUuidPath())) {
+ if (!confirmIdentity(config.getSenderPath())) {
logger.error("Sorry, you do not have the permission to connect to sync receiver.");
System.exit(1);
}
- // 4. Create snapshot
- for (Entry<String, Set<String>> entry : validAllFiles.entrySet()) {
- validFileSnapshot.put(entry.getKey(), makeFileSnapshot(entry.getValue()));
+ // 2. Sync Schema
+ syncSchema();
+
+ // 3. Sync all data
+ String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+ for (String dataDir : dataDirs) {
+ logger.info("Start to sync data in data dir {}", dataDir);
+ config.update(dataDir);
+ SyncFileManager.getInstance().getValidFiles(dataDir);
+ lastLocalFilesMap = SyncFileManager.getInstance().getLastLocalFilesMap();
+ deletedFilesMap = SyncFileManager.getInstance().getDeletedFilesMap();
+ toBeSyncedFilesMap = SyncFileManager.getInstance().getToBeSyncedFilesMap();
+ checkRecovery();
+ if (SyncUtils.isEmpty(deletedFilesMap) && SyncUtils.isEmpty(toBeSyncedFilesMap)) {
+ logger.info("There has no data to sync in data dir {}", dataDir);
+ continue;
+ }
+ sync();
+ logger.info("Finish to sync data in data dir {}", dataDir);
}
- syncStatus = true;
+ // 4. notify receiver that synchronization finish
+ // At this point the synchronization has finished even if connection fails
+ try {
+ serviceClient.endSync();
+ transport.close();
+ logger.info("Sync process has finished.");
+ } catch (TException e) {
+ logger.error("Unable to connect to receiver.", e);
+ }
+ }
+ /**
+ * Execute a sync task.
+ */
+ @Override
+ public void sync() throws IOException {
try {
- // 5. Sync schema
- syncSchema();
+ syncStatus = true;
+ syncLog = new SyncSenderLogger(getSchemaLogFile());
+
+ // 1. Sync data
+ for (Entry<String, Set<File>> entry : deletedFilesMap.entrySet()) {
+ // TODO deal with the situation
+ try {
+ if (serviceClient.init(entry.getKey()) == ResultStatus.FAILURE) {
+ throw new SyncConnectionException("unable init receiver");
+ }
+ } catch (TException | SyncConnectionException e) {
+ throw new SyncConnectionException("Unable to connect to receiver", e);
+ }
+ logger.info("Sync process starts to transfer data of storage group {}", entry.getKey());
+ syncDeletedFilesName(entry.getKey(), entry.getValue());
+ syncDataFilesInOneGroup(entry.getKey(), entry.getValue());
+ }
- // 6. Sync data
- syncAllData();
- } catch (SyncConnectionException e) {
+ // 2. Clear sync log
+ clearSyncLog();
+
+ } catch (SyncConnectionException | TException e) {
logger.error("cannot finish sync process", e);
+ } finally {
+ if (syncLog != null) {
+ syncLog.close();
+ }
syncStatus = false;
- return;
}
+ }
- // 7. clear snapshot
- for (String snapshotPath : config.getSnapshotPaths()) {
- FileUtils.deleteDirectory(new File(snapshotPath));
- }
+ private void checkRecovery() {
+
+ }
+
+ public void clearSyncLog() {
- // 8. notify receiver that synchronization finish
- // At this point the synchronization has finished even if connection fails
- try {
- serviceClient.cleanUp();
- } catch (TException e) {
- logger.error("Unable to connect to receiver.", e);
- }
- transport.close();
- logger.info("Sync process has finished.");
- syncStatus = false;
}
@Override
- public void syncAllData() throws SyncConnectionException {
- for (Entry<String, Set<String>> entry : validAllFiles.entrySet()) {
- Set<String> validFiles = entry.getValue();
- Set<String> validSnapshot = validFileSnapshot.get(entry.getKey());
- if (validSnapshot.isEmpty()) {
- continue;
+ public void syncDeletedFilesName(String sgName, Set<File> deletedFilesName) {
+ if (deletedFilesName.isEmpty()) {
+ logger.info("There has no deleted files to be synced in storage group {}", sgName);
+ return;
+ }
+ logger.info("Start to sync names of deleted files in storage group {}", sgName);
+ for(File file:deletedFilesName){
+ try {
+ serviceClient.syncDeletedFileName(file.getName());
+ } catch (TException e) {
+ logger.error("Can not sync deleted file name {}, skip it.", file);
}
- logger.info("Sync process starts to transfer data of storage group {}", entry.getKey());
+ }
+ logger.info("Finish to sync names of deleted files in storage group {}", sgName);
+ }
+
+ @Override
+ public void syncDataFilesInOneGroup(String sgName, Set<File> toBeSyncFiles)
+ throws SyncConnectionException {
+ Set<String> validSnapshot = validFileSnapshot.get(sgName);
+ if (validSnapshot.isEmpty()) {
+ logger.info("There has no new tsfiles to be synced in storage group {}", sgName);
+ return;
+ }
+ logger.info("Sync process starts to transfer data of storage group {}", sgName);
+ int cnt = 0;
+ for (String tsfilePath : toBeSyncFiles) {
+ cnt++;
+ File snapshotFile = null;
try {
- if (!serviceClient.init(entry.getKey())) {
- throw new SyncConnectionException("unable init receiver");
+ snapshotFile = makeFileSnapshot(tsfilePath);
+ syncSingleFile(snapshotFile);
+ logger.info("Task of synchronization has completed {}/{}.", cnt, toBeSyncFiles.size());
+ } catch (IOException e) {
+ logger.info(
+ "Tsfile {} can not make snapshot, so skip the tsfile and continue to sync other tsfiles",
+ tsfilePath, e);
+ } finally {
+ if(snapshotFile != null) {
+ snapshotFile.deleteOnExit();
}
- } catch (TException e) {
- throw new SyncConnectionException("Unable to connect to receiver", e);
}
- syncData(validSnapshot);
- if (afterSynchronization()) {
- currentLocalFiles.get(entry.getKey()).addAll(validFiles);
- syncFileManager.setCurrentLocalFiles(currentLocalFiles);
- syncFileManager.backupNowLocalFileInfo(config.getLastFileInfo());
- logger.info("Sync process has finished storage group {}.", entry.getKey());
- } else {
- logger.error("Receiver cannot sync data, abandon this synchronization of storage group {}",
- entry.getKey());
+ }
+ logger.info("Sync process has finished storage group {}.", sgName);
+ }
+
+ private File makeFileSnapshot(String filePath) throws IOException {
+ String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath);
+ File newFile = new File(snapshotFilePath);
+ if (!newFile.getParentFile().exists()) {
+ newFile.getParentFile().mkdirs();
+ }
+ Path link = FileSystems.getDefault().getPath(snapshotFilePath);
+ Path target = FileSystems.getDefault().getPath(filePath);
+ Files.createLink(link, target);
+ return newFile;
+ }
+
+
+ /**
+ * Transfer data of a storage group to receiver.
+ *
+ */
+ private void syncSingleFile(File snapshotFile) throws SyncConnectionException {
+ try {
+ int retryCount = 0;
+ MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
+ outer:
+ while (true) {
+ retryCount++;
+ if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
+ throw new SyncConnectionException(String
+ .format("Can not sync file %s after %s tries.", snapshotFile.getAbsoluteFile(),
+ Constans.MAX_SYNC_FILE_TRY));
+ }
+ md.reset();
+ byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
+ int dataLength;
+ try (FileInputStream fis = new FileInputStream(snapshotFile);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
+ while ((dataLength = fis.read(buffer)) != -1) { // cut the file into pieces to send
+ bos.write(buffer, 0, dataLength);
+ md.update(buffer, 0, dataLength);
+ ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+ bos.reset();
+ if (!Boolean.parseBoolean(serviceClient
+ .syncData(null, snapshotFile.getName(), buffToSend,
+ SyncDataStatus.PROCESSING_STATUS))) {
+ logger.info("Receiver failed to receive data from {}, retry.",
+ snapshotFile.getAbsoluteFile());
+ continue outer;
+ }
+ }
+ }
+
+ // the file is sent successfully
+ String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
+ String md5OfReceiver = serviceClient.syncData(md5OfSender, snapshotFile.getName(),
+ null, SyncDataStatus.FINISH_STATUS);
+ if (md5OfSender.equals(md5OfReceiver)) {
+ logger.info("Receiver has received {} successfully.", snapshotFile.getAbsoluteFile());
+ break;
+ }
}
+ } catch (IOException e) {
+ throw new SyncConnectionException("Cannot sync data with receiver.", e);
}
}
+
/**
* Establish a connection between the sender and the receiver.
*
@@ -271,7 +380,6 @@ public class DataTransferManager implements IDataTransferManager {
try {
transport.open();
} catch (TTransportException e) {
- syncStatus = false;
logger.error("Cannot connect to server");
throw new SyncConnectionException(e);
}
@@ -317,19 +425,18 @@ public class DataTransferManager implements IDataTransferManager {
}
private String generateUUID() {
- return Constans.SYNC_CLIENT + UUID.randomUUID().toString().replaceAll("-", "");
+ return Constans.SYNC_SENDER + UUID.randomUUID().toString().replaceAll("-", "");
}
/**
* Create snapshots for valid files.
*/
@Override
- public Set<String> makeFileSnapshot(Set<String> validFiles) throws IOException {
+ public Set<String> makeFileSnapshot(Set<String> validFiles) {
Set<String> validFilesSnapshot = new HashSet<>();
- try {
- for (String filePath : validFiles) {
+ for (String filePath : validFiles) {
+ try {
String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath);
- validFilesSnapshot.add(snapshotFilePath);
File newFile = new File(snapshotFilePath);
if (!newFile.getParentFile().exists()) {
newFile.getParentFile().mkdirs();
@@ -337,144 +444,126 @@ public class DataTransferManager implements IDataTransferManager {
Path link = FileSystems.getDefault().getPath(snapshotFilePath);
Path target = FileSystems.getDefault().getPath(filePath);
Files.createLink(link, target);
+ validFilesSnapshot.add(snapshotFilePath);
+ } catch (IOException e) {
+ logger.error("Can not make snapshot for file {}", filePath);
}
- } catch (IOException e) {
- logger.error("Can not make fileSnapshot");
- throw new IOException(e);
}
return validFilesSnapshot;
}
/**
- * Transfer data of a storage group to receiver.
- *
- * @param fileSnapshotList list of sending snapshot files in a storage group.
- */
- public void syncData(Set<String> fileSnapshotList) throws SyncConnectionException {
- try {
- int successNum = 0;
- for (String snapshotFilePath : fileSnapshotList) {
- successNum++;
- File file = new File(snapshotFilePath);
- List<String> filePathSplit = new ArrayList<>();
- String os = System.getProperty("os.name");
- if (os.toLowerCase().startsWith("windows")) {
- String[] name = snapshotFilePath.split(File.separator + File.separator);
- filePathSplit.add(name[name.length - 2]);
- filePathSplit.add(name[name.length - 1]);
- } else {
- String[] name = snapshotFilePath.split(File.separator);
- filePathSplit.add(name[name.length - 2]);
- filePathSplit.add(name[name.length - 1]);
- }
- int retryCount = 0;
- // Get md5 of the file.
- MessageDigest md = MessageDigest.getInstance("MD5");
- outer:
- while (true) {
- retryCount++;
- // Sync all data to receiver
- if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
- throw new SyncConnectionException(String
- .format("can not sync file %s after %s tries.", snapshotFilePath,
- Constans.MAX_SYNC_FILE_TRY));
- }
- md.reset();
- byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
- int dataLength;
- try (FileInputStream fis = new FileInputStream(file);
- ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
- while ((dataLength = fis.read(buffer)) != -1) { // cut the file into pieces to send
- bos.write(buffer, 0, dataLength);
- md.update(buffer, 0, dataLength);
- ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
- bos.reset();
- if (!Boolean.parseBoolean(serviceClient
- .syncData(null, filePathSplit, buffToSend, SyncDataStatus.PROCESSING_STATUS))) {
- logger.info("Receiver failed to receive data from {}, retry.", snapshotFilePath);
- continue outer;
- }
- }
- }
-
- // the file is sent successfully
- String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
- String md5OfReceiver = serviceClient.syncData(md5OfSender, filePathSplit,
- null, SyncDataStatus.FINISH_STATUS);
- if (md5OfSender.equals(md5OfReceiver)) {
- logger.info("Receiver has received {} successfully.", snapshotFilePath);
- break;
- }
- }
- logger.info(String.format("Task of synchronization has completed %d/%d.", successNum,
- fileSnapshotList.size()));
- }
- } catch (Exception e) {
- throw new SyncConnectionException("Cannot sync data with receiver.", e);
- }
- }
-
- /**
* Sync schema with receiver.
*/
@Override
- public void syncSchema() throws SyncConnectionException {
+ public void syncSchema() throws SyncConnectionException, TException {
int retryCount = 0;
- outer:
+ serviceClient.initSyncData(MetadataConstant.METADATA_LOG);
while (true) {
- retryCount++;
if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
throw new SyncConnectionException(String
- .format("can not sync schema after %s tries.", Constans.MAX_SYNC_FILE_TRY));
+ .format("Can not sync schema after %s tries.", Constans.MAX_SYNC_FILE_TRY));
}
- byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
- try (FileInputStream fis = new FileInputStream(new File(config.getSchemaPath()));
- ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
- // Get md5 of the file.
- MessageDigest md = MessageDigest.getInstance("MD5");
- int dataLength;
- while ((dataLength = fis.read(buffer)) != -1) { // cut the file into pieces to send
- bos.write(buffer, 0, dataLength);
- md.update(buffer, 0, dataLength);
+ try {
+ if (tryToSyncSchema()) {
+ writeSyncSchemaPos(getSchemaPosFile());
+ break;
+ }
+ } finally {
+ retryCount++;
+ }
+ }
+ }
+
+ private boolean tryToSyncSchema() {
+ int schemaPos = readSyncSchemaPos(getSchemaPosFile());
+
+ // start to sync file data and get md5 of this file.
+ try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
+ schemaFileLinePos = 0;
+ while (schemaFileLinePos++ <= schemaPos) {
+ br.readLine();
+ }
+ MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
+ String line;
+ int cntLine = 0;
+ while ((line = br.readLine()) != null) {
+ schemaFileLinePos++;
+ byte[] singleLineData = BytesUtils.stringToBytes(line);
+ bos.write(singleLineData);
+ md.update(singleLineData);
+ if (cntLine++ == BATCH_LINE) {
ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
bos.reset();
// PROCESSING_STATUS represents there is still schema buffer to send.
- if (!Boolean.parseBoolean(
- serviceClient.syncSchema(null, buffToSend, SyncDataStatus.PROCESSING_STATUS))) {
+ if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
logger.error("Receiver failed to receive metadata, retry.");
- continue outer;
+ return false;
}
+ cntLine = 0;
}
- bos.close();
- String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
- String md5OfReceiver = serviceClient
- .syncSchema(md5OfSender, null, SyncDataStatus.FINISH_STATUS);
- if (md5OfSender.equals(md5OfReceiver)) {
- logger.info("Receiver has received schema successfully.");
- /** receiver start to load metadata **/
- if (Boolean
- .parseBoolean(serviceClient.syncSchema(null, null, SyncDataStatus.SUCCESS_STATUS))) {
- throw new SyncConnectionException("Receiver failed to load metadata");
- }
- break;
+ }
+ if (bos.size() != 0) {
+ ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+ bos.reset();
+ if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
+ logger.error("Receiver failed to receive metadata, retry.");
+ return false;
}
- } catch (Exception e) {
- logger.error("Cannot sync schema ", e);
- throw new SyncConnectionException(e);
}
+
+ // check md5
+ return checkMD5ForSchema((new BigInteger(1, md.digest())).toString(16));
+ } catch (NoSuchAlgorithmException | IOException | TException e) {
+ logger.error("Can not finish transfer schema to receiver", e);
+ return false;
}
}
- @Override
- public boolean afterSynchronization() throws SyncConnectionException {
- boolean successOrNot;
+ private File getSchemaPosFile() {
+ return new File(config.getSenderPath(), Constans.SCHEMA_POS_FILE_NAME)
+ }
+
+ private File getSchemaLogFile() {
+ return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(),
+ MetadataConstant.METADATA_LOG);
+ }
+
+ private boolean checkMD5ForSchema(String md5OfSender) throws TException {
+ String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
+ if (md5OfSender.equals(md5OfReceiver)) {
+ logger.info("Receiver has received schema successfully, retry.");
+ return true;
+ } else {
+ logger.error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
+ return false;
+ }
+ }
+
+ private int readSyncSchemaPos(File syncSchemaLogFile) {
try {
- successOrNot = serviceClient.load();
- } catch (TException e) {
- throw new SyncConnectionException(
- "Can not finish sync process because sync receiver has broken down.", e);
+ if (syncSchemaLogFile.exists()) {
+ try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) {
+ return Integer.parseInt(br.readLine());
+ }
+ }
+ } catch (IOException e) {
+ logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
+ }
+ return 0;
+ }
+
+ private void writeSyncSchemaPos(File syncSchemaLogFile) {
+ try {
+ if (syncSchemaLogFile.exists()) {
+ try (BufferedWriter br = new BufferedWriter(new FileWriter(syncSchemaLogFile))) {
+ br.write(Integer.toString(schemaFileLinePos));
+ }
+ }
+ } catch (IOException e) {
+ logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
}
- return successOrNot;
}
/**
@@ -490,7 +579,7 @@ public class DataTransferManager implements IDataTransferManager {
lockFile.createNewFile();
}
if (!lockInstance(config.getLockFilePath())) {
- logger.error("Sync client is running.");
+ logger.error("Sync client is already running.");
System.exit(1);
}
}
@@ -498,9 +587,9 @@ public class DataTransferManager implements IDataTransferManager {
/**
* Try to lock lockfile. if failed, it means that sync client has benn started.
*
- * @param lockFile path of lockfile
+ * @param lockFile path of lock file
*/
- private static boolean lockInstance(final String lockFile) {
+ private boolean lockInstance(final String lockFile) {
try {
final File file = new File(lockFile);
final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
@@ -510,7 +599,6 @@ public class DataTransferManager implements IDataTransferManager {
try {
fileLock.release();
randomAccessFile.close();
- FileUtils.forceDelete(file);
} catch (Exception e) {
logger.error("Unable to remove lock file: {}", lockFile, e);
}
@@ -528,11 +616,15 @@ public class DataTransferManager implements IDataTransferManager {
private static final DataTransferManager INSTANCE = new DataTransferManager();
}
- public void setConfig(SyncSenderConfig config) {
- this.config = config;
+ private File getSyncLogFile() {
+ return new File(config.getSenderPath(), Constans.SYNC_LOG_NAME);
+ }
+
+ private File getCurrentLogFile() {
+ return new File(config.getSenderPath(), Constans.CURRENT_SYNC_LOG_NAME);
}
- public List<String> getSchema() {
- return schema;
+ public void setConfig(SyncSenderConfig config) {
+ DataTransferManager.config = config;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
index 3941a90..82b0d00 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
@@ -18,9 +18,11 @@
*/
package org.apache.iotdb.db.sync.sender.transfer;
+import java.io.File;
import java.io.IOException;
import java.util.Set;
import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.thrift.TException;
/**
* SyncSender defines the methods of a sender in sync module.
@@ -45,22 +47,20 @@ public interface IDataTransferManager {
/**
* Make file snapshots before sending files.
*/
- Set<String> makeFileSnapshot(Set<String> validFiles) throws IOException;
+ Set<String> makeFileSnapshot(Set<String> validFiles);
/**
* Send schema file to receiver.
*/
- void syncSchema() throws SyncConnectionException;
+ void syncSchema() throws SyncConnectionException, TException;
- /**
- * For all valid files, send it to receiver side and load these data in receiver.
- */
- void syncAllData() throws SyncConnectionException;
+ void syncDeletedFilesName(String sgName, Set<File> deletedFilesName)
+ throws SyncConnectionException;
/**
- * Close the socket after sending files.
+ * For all valid files, send it to receiver side and load these data in receiver.
*/
- boolean afterSynchronization() throws SyncConnectionException;
+ void syncDataFilesInOneGroup(String sgName, Set<File> deletedFilesName) throws SyncConnectionException;
/**
* Execute a sync task.
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
index 5a056a9..8e7a9e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
@@ -29,9 +29,6 @@ public class SyncUtils {
private static final String IP_SEPARATOR = "\\.";
- private static String[] snapshotPaths = SyncSenderDescriptor.getInstance()
- .getConfig().getSnapshotPaths();
-
private SyncUtils() {
}
@@ -41,40 +38,20 @@ public class SyncUtils {
* sender.
*/
public static String getSnapshotFilePath(String filePath) {
- String[] name;
- String relativeFilePath;
- String os = System.getProperty("os.name");
- if (os.toLowerCase().startsWith("windows")) {
- name = filePath.split(File.separator + File.separator);
- relativeFilePath = name[name.length - 2] + File.separator + name[name.length - 1];
- } else {
- name = filePath.split(File.separator);
- relativeFilePath = name[name.length - 2] + File.separator + name[name.length - 1];
- }
- String bufferWritePath = name[0];
- for (int i = 1; i < name.length - 2; i++) {
- bufferWritePath = bufferWritePath + File.separatorChar + name[i];
- }
- for (String snapshotPath : snapshotPaths) {
- if (snapshotPath.startsWith(bufferWritePath)) {
- if (!new File(snapshotPath).exists()) {
- new File(snapshotPath).mkdir();
- }
- if (snapshotPath.length() > 0
- && snapshotPath.charAt(snapshotPath.length() - 1) != File.separatorChar) {
- snapshotPath = snapshotPath + File.separatorChar;
- }
- return snapshotPath + relativeFilePath;
- }
+ String relativeFilePath =
+ new File(filePath).getParent() + File.separator + new File(filePath).getName();
+ String snapshotDir = SyncSenderDescriptor.getInstance().getConfig().getSnapshotPath();
+ if (!new File(snapshotDir).exists()) {
+ new File(snapshotDir).mkdirs();
}
- return null;
+ return new File(snapshotDir, relativeFilePath).getAbsolutePath();
}
/**
* Verify sending list is empty or not It's used by sync sender.
*/
- public static boolean isEmpty(Map<String, Set<String>> sendingFileList) {
- for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) {
+ public static boolean isEmpty(Map<String, Set<File>> sendingFileList) {
+ for (Entry<String, Set<File>> entry : sendingFileList.entrySet()) {
if (!entry.getValue().isEmpty()) {
return false;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
index ffb1f2b..7a06281 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
@@ -147,7 +147,7 @@ public class SingleClientSyncTest {
"insert into root.test.d0(timestamp,s1) values(3000,'1309')",
"insert into root.test.d1.g0(timestamp,s0) values(400,1050)", "merge", "flush",};
private boolean testFlag = Constant.testFlag;
- private static final String SYNC_CLIENT = Constans.SYNC_CLIENT;
+ private static final String SYNC_CLIENT = Constans.SYNC_SENDER;
private static final Logger logger = LoggerFactory.getLogger(SingleClientSyncTest.class);
public static void main(String[] args) throws Exception {
@@ -159,7 +159,7 @@ public class SingleClientSyncTest {
}
public void setConfig() {
- config.setUuidPath(
+ config.setSenderPath(
config.getDataDirectory() + SYNC_CLIENT + File.separator + Constans.UUID_FILE_NAME);
config.setLastFileInfo(
config.getDataDirectory() + SYNC_CLIENT + File.separator + Constans.LAST_LOCAL_FILE_NAME);
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
index 4c8356a..322dfe0 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
public class SyncFileManagerTest {
- private static final String POST_BACK_DIRECTORY_TEST = Constans.SYNC_CLIENT + File.separator;
+ private static final String POST_BACK_DIRECTORY_TEST = Constans.SYNC_SENDER + File.separator;
private static final String LAST_FILE_INFO_TEST =
POST_BACK_DIRECTORY_TEST + Constans.LAST_LOCAL_FILE_NAME;
private static final String SENDER_FILE_PATH_TEST = POST_BACK_DIRECTORY_TEST + "data";
diff --git a/service-rpc/src/main/thrift/sync.thrift b/service-rpc/src/main/thrift/sync.thrift
index fec6079..6fc394a 100755
--- a/service-rpc/src/main/thrift/sync.thrift
+++ b/service-rpc/src/main/thrift/sync.thrift
@@ -22,17 +22,18 @@ typedef i32 int
typedef i16 short
typedef i64 long
-enum SyncDataStatus {
- SUCCESS_STATUS,
- FINISH_STATUS,
- PROCESSING_STATUS
+enum ResultStatus {
+ SUCCESS,
+ FAILURE,
+ BUSY
}
service SyncService{
- bool checkIdentity(1:string uuid, 2:string address)
- string syncSchema(1:string md5, 2:binary buff, 3:SyncDataStatus status)
- string syncData(1:string md5, 2:list<string> filename, 3:binary buff, 4:SyncDataStatus status)
- bool load()
- void cleanUp()
- bool init(1:string storageGroupName)
+ ResultStatus checkIdentity(1:string address)
+ ResultStatus init(1:string storageGroupName)
+ ResultStatus syncDeletedFileName(1:string fileName)
+ ResultStatus initSyncData(1:string filename)
+ ResultStatus syncData(1:binary buff)
+ string checkDataMD5(1:string md5)
+ void endSync()
}
\ No newline at end of file