You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/08/22 03:14:14 UTC
[incubator-iotdb] 03/05: complete all sender module
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 7b57574ce0e47b6457d2cfe6de086f0d30bba029
Author: lta <li...@163.com>
AuthorDate: Wed Aug 21 17:03:52 2019 +0800
complete all sender module
---
.../receiver/transfer/SyncServiceImplBackup.java | 736 ---------------------
.../apache/iotdb/db/sync/sender/conf/Constans.java | 2 +-
.../db/sync/sender/manage/ISyncFileManager.java | 4 -
.../db/sync/sender/manage/SyncFileManager.java | 9 -
.../sender/recover/ISyncSenderLogAnalyzer.java | 6 +-
.../db/sync/sender/recover/ISyncSenderLogger.java | 8 -
.../sync/sender/recover/SyncSenderLogAnalyzer.java | 114 ++++
.../db/sync/sender/recover/SyncSenderLogger.java | 50 +-
.../sync/sender/transfer/DataTransferManager.java | 276 ++++----
.../sync/sender/transfer/IDataTransferManager.java | 10 +-
.../java/org/apache/iotdb/db/utils/SyncUtils.java | 7 +-
11 files changed, 255 insertions(+), 967 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java
deleted file mode 100644
index 6f2f754..0000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java
+++ /dev/null
@@ -1,736 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied. See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// */
-//package org.apache.iotdb.db.sync.receiver.transfer;
-//
-//import java.io.BufferedReader;
-//import java.io.File;
-//import java.io.FileInputStream;
-//import java.io.FileNotFoundException;
-//import java.io.FileOutputStream;
-//import java.io.IOException;
-//import java.math.BigInteger;
-//import java.nio.ByteBuffer;
-//import java.nio.channels.FileChannel;
-//import java.security.MessageDigest;
-//import java.util.ArrayList;
-//import java.util.Collections;
-//import java.util.HashMap;
-//import java.util.HashSet;
-//import java.util.Iterator;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.Map.Entry;
-//import java.util.Set;
-//import org.apache.commons.io.FileUtils;
-//import org.apache.commons.lang3.StringUtils;
-//import org.apache.iotdb.db.concurrent.ThreadName;
-//import org.apache.iotdb.db.conf.IoTDBConfig;
-//import org.apache.iotdb.db.conf.IoTDBDescriptor;
-//import org.apache.iotdb.db.conf.directories.DirectoryManager;
-//import org.apache.iotdb.db.engine.StorageEngine;
-//import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-//import org.apache.iotdb.db.exception.MetadataErrorException;
-//import org.apache.iotdb.db.exception.PathErrorException;
-//import org.apache.iotdb.db.exception.ProcessorException;
-//import org.apache.iotdb.db.exception.StorageEngineException;
-//import org.apache.iotdb.db.metadata.MManager;
-//import org.apache.iotdb.db.metadata.MetadataConstant;
-//import org.apache.iotdb.db.metadata.MetadataOperationType;
-//import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
-//import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-//import org.apache.iotdb.db.sync.sender.conf.Constans;
-//import org.apache.iotdb.db.utils.FilePathUtils;
-//import org.apache.iotdb.db.utils.SyncUtils;
-//import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
-//import org.apache.iotdb.service.sync.thrift.SyncService;
-//import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-//import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-//import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-//import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
-//import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-//import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-//import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-//import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
-//import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-//import org.apache.iotdb.tsfile.read.common.Field;
-//import org.apache.iotdb.tsfile.read.common.Path;
-//import org.apache.iotdb.tsfile.read.common.RowRecord;
-//import org.apache.iotdb.tsfile.read.expression.QueryExpression;
-//import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//public class SyncServiceImplBackup implements SyncService.Iface {
-//
-// private static final Logger logger = LoggerFactory.getLogger(SyncServiceImplBackup.class);
-//
-// private static final StorageEngine STORAGE_GROUP_MANAGER = StorageEngine.getInstance();
-// /**
-// * Metadata manager
-// **/
-// private static final MManager metadataManger = MManager.getInstance();
-//
-// private static final String SYNC_SERVER = Constans.SYNC_RECEIVER;
-//
-// private ThreadLocal<String> uuid = new ThreadLocal<>();
-// /**
-// * String means storage group,List means the set of new files(path) in local IoTDB and String
-// * means path of new Files
-// **/
-// private ThreadLocal<Map<String, List<String>>> fileNodeMap = new ThreadLocal<>();
-// /**
-// * Map String1 means timeseries String2 means path of new Files, long means startTime
-// **/
-// private ThreadLocal<Map<String, Map<String, Long>>> fileNodeStartTime = new ThreadLocal<>();
-// /**
-// * Map String1 means timeseries String2 means path of new Files, long means endTime
-// **/
-// private ThreadLocal<Map<String, Map<String, Long>>> fileNodeEndTime = new ThreadLocal<>();
-//
-// /**
-// * Total num of files that needs to be loaded
-// */
-// private ThreadLocal<Integer> fileNum = new ThreadLocal<>();
-//
-// /**
-// * IoTDB config
-// **/
-// private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-//
-// /**
-// * IoTDB data directory
-// **/
-// private String baseDir = config.getBaseDir();
-//
-// /**
-// * IoTDB multiple bufferWrite directory
-// **/
-// private String[] bufferWritePaths = config.getDataDirs();
-//
-// /**
-// * The path to store metadata file of sender
-// */
-// private ThreadLocal<String> schemaFromSenderPath = new ThreadLocal<>();
-//
-// /**
-// * Sync folder path of server
-// **/
-// private String syncFolderPath;
-//
-// /**
-// * Sync data path of server
-// */
-// private String syncDataPath;
-//
-// /**
-// * Init threadLocal variable and delete old useless files.
-// */
-// @Override
-// public boolean init(String storageGroup) {
-// logger.info("Sync process starts to receive data of storage group {}", storageGroup);
-// fileNum.set(0);
-// fileNodeMap.set(new HashMap<>());
-// fileNodeStartTime.set(new HashMap<>());
-// fileNodeEndTime.set(new HashMap<>());
-// try {
-// FileUtils.deleteDirectory(new File(syncDataPath));
-// } catch (IOException e) {
-// logger.error("cannot delete directory {} ", syncFolderPath);
-// return false;
-// }
-// for (String bufferWritePath : bufferWritePaths) {
-// bufferWritePath = FilePathUtils.regularizePath(bufferWritePath);
-// String backupPath = bufferWritePath + SYNC_SERVER + File.separator;
-// File backupDirectory = new File(backupPath, this.uuid.get());
-// if (backupDirectory.exists() && backupDirectory.list().length != 0) {
-// try {
-// FileUtils.deleteDirectory(backupDirectory);
-// } catch (IOException e) {
-// logger.error("cannot delete directory {} ", syncFolderPath);
-// return false;
-// }
-// }
-// }
-// return true;
-// }
-//
-// /**
-// * Verify IP address of sender
-// */
-// @Override
-// public boolean checkIdentity(String uuid, String ipAddress) {
-// Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
-// this.uuid.set(uuid);
-// initPath();
-// return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress);
-// }
-//
-// /**
-// * Init file path and clear data if last sync process failed.
-// */
-// private void initPath() {
-// baseDir = FilePathUtils.regularizePath(baseDir);
-// syncFolderPath = baseDir + SYNC_SERVER + File.separatorChar + this.uuid.get();
-// syncDataPath = syncFolderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME;
-// schemaFromSenderPath
-// .set(syncFolderPath + File.separator + MetadataConstant.METADATA_LOG);
-// }
-//
-// /**
-// * Acquire schema from sender
-// *
-// * @param status: FINIFSH_STATUS, SUCCESS_STATUS or PROCESSING_STATUS. status = FINISH_STATUS :
-// * finish receiving schema file, start to sync schema. status = PROCESSING_STATUS : the schema
-// * file has not received completely.SUCCESS_STATUS: load metadata.
-// */
-// @Override
-// public String syncSchema(String md5, ByteBuffer schema, SyncDataStatus status) {
-// String md5OfReceiver = Boolean.toString(Boolean.TRUE);
-// if (status == SyncDataStatus.SUCCESS_STATUS) {
-// /** sync metadata, include storage group and timeseries **/
-// return Boolean.toString(loadMetadata());
-// } else if (status == SyncDataStatus.PROCESSING_STATUS) {
-// File file = new File(schemaFromSenderPath.get());
-// if (!file.getParentFile().exists()) {
-// try {
-// file.getParentFile().mkdirs();
-// file.createNewFile();
-// } catch (IOException e) {
-// logger.error("Cannot make schema file {}.", file.getPath(), e);
-// md5OfReceiver = Boolean.toString(Boolean.FALSE);
-// }
-// }
-// try (FileOutputStream fos = new FileOutputStream(file, true);
-// FileChannel channel = fos.getChannel()) {
-// channel.write(schema);
-// } catch (Exception e) {
-// logger.error("Cannot insert data to file {}.", file.getPath(), e);
-// md5OfReceiver = Boolean.toString(Boolean.FALSE);
-// }
-// } else {
-// try (FileInputStream fis = new FileInputStream(schemaFromSenderPath.get())) {
-// MessageDigest md = MessageDigest.getInstance("MD5");
-// byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
-// int n;
-// while ((n = fis.read(buffer)) != -1) {
-// md.update(buffer, 0, n);
-// }
-// md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
-// if (!md5.equals(md5OfReceiver)) {
-// FileUtils.forceDelete(new File(schemaFromSenderPath.get()));
-// }
-// } catch (Exception e) {
-// logger.error("Receiver cannot generate md5 {}", schemaFromSenderPath.get(), e);
-// }
-// }
-// return md5OfReceiver;
-// }
-//
-// /**
-// * Load metadata from sender
-// */
-// private boolean loadMetadata() {
-// if (new File(schemaFromSenderPath.get()).exists()) {
-// try (BufferedReader br = new BufferedReader(
-// new java.io.FileReader(schemaFromSenderPath.get()))) {
-// String metadataOperation;
-// while ((metadataOperation = br.readLine()) != null) {
-// operation(metadataOperation);
-// }
-// } catch (FileNotFoundException e) {
-// logger.error("Cannot read the file {}.",
-// schemaFromSenderPath.get(), e);
-// return false;
-// } catch (IOException e) {
-// /** multiple insert schema, ignore it **/
-// } catch (Exception e) {
-// logger.error("Parse metadata operation failed.", e);
-// return false;
-// }
-// }
-// return true;
-// }
-//
-// /**
-// * Operate metadata operation in MManager
-// *
-// * @param cmd metadata operation
-// */
-// private void operation(String cmd)
-// throws PathErrorException, IOException, MetadataErrorException {
-// String[] args = cmd.trim().split(",");
-// switch (args[0]) {
-// case MetadataOperationType.ADD_PATH_TO_MTREE:
-// Map<String, String> props;
-// String[] kv;
-// props = new HashMap<>(args.length - 5 + 1, 1);
-// for (int k = 5; k < args.length; k++) {
-// kv = args[k].split("=");
-// props.put(kv[0], kv[1]);
-// }
-// metadataManger.addPathToMTree(new Path(args[1]), TSDataType.deserialize(Short.valueOf(args[2])),
-// TSEncoding.deserialize(Short.valueOf(args[3])),
-// CompressionType.deserialize(Short.valueOf(args[4])),
-// props);
-// break;
-// case MetadataOperationType.DELETE_PATH_FROM_MTREE:
-// metadataManger.deletePaths(Collections.singletonList(new Path(args[1])));
-// break;
-// case MetadataOperationType.SET_STORAGE_LEVEL_TO_MTREE:
-// metadataManger.setStorageLevelToMTree(args[1]);
-// break;
-// case MetadataOperationType.ADD_A_PTREE:
-// metadataManger.addAPTree(args[1]);
-// break;
-// case MetadataOperationType.ADD_A_PATH_TO_PTREE:
-// metadataManger.addPathToPTree(args[1]);
-// break;
-// case MetadataOperationType.DELETE_PATH_FROM_PTREE:
-// metadataManger.deletePathFromPTree(args[1]);
-// break;
-// case MetadataOperationType.LINK_MNODE_TO_PTREE:
-// metadataManger.linkMNodeToPTree(args[1], args[2]);
-// break;
-// case MetadataOperationType.UNLINK_MNODE_FROM_PTREE:
-// metadataManger.unlinkMNodeFromPTree(args[1], args[2]);
-// break;
-// default:
-// logger.error("Unrecognizable command {}", cmd);
-// }
-// }
-//
-// /**
-// * Start receiving tsfile from sender
-// *
-// * @param status status = SUCCESS_STATUS : finish receiving one tsfile status = PROCESSING_STATUS
-// * : tsfile has not received completely.
-// */
-// @Override
-// public String syncData(String md5OfSender, List<String> filePathSplit,
-// ByteBuffer dataToReceive, SyncDataStatus status) {
-// String md5OfReceiver = Boolean.toString(Boolean.TRUE);
-// FileChannel channel;
-// /** Recombination File Path **/
-// String filePath = StringUtils.join(filePathSplit, File.separatorChar);
-// syncDataPath = FilePathUtils.regularizePath(syncDataPath);
-// filePath = syncDataPath + filePath;
-// if (status == SyncDataStatus.PROCESSING_STATUS) { // there are still data stream to add
-// File file = new File(filePath);
-// if (!file.getParentFile().exists()) {
-// try {
-// file.getParentFile().mkdirs();
-// file.createNewFile();
-// } catch (IOException e) {
-// logger.error("cannot make file {}", file.getPath(), e);
-// md5OfReceiver = Boolean.toString(Boolean.FALSE);
-// }
-// }
-// try (FileOutputStream fos = new FileOutputStream(file, true)) {// append new data
-// channel = fos.getChannel();
-// channel.write(dataToReceive);
-// } catch (IOException e) {
-// logger.error("cannot insert data to file {}", file.getPath(), e);
-// md5OfReceiver = Boolean.toString(Boolean.FALSE);
-//
-// }
-// } else { // all data in the same file has received successfully
-// try (FileInputStream fis = new FileInputStream(filePath)) {
-// MessageDigest md = MessageDigest.getInstance("MD5");
-// byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
-// int n;
-// while ((n = fis.read(buffer)) != -1) {
-// md.update(buffer, 0, n);
-// }
-// md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
-// if (md5OfSender.equals(md5OfReceiver)) {
-// fileNum.set(fileNum.get() + 1);
-//
-// logger.info(String.format("Receiver has received %d files from sender", fileNum.get()));
-// } else {
-// FileUtils.forceDelete(new File(filePath));
-// }
-// } catch (Exception e) {
-// logger.error("Receiver cannot generate md5 {}", filePath, e);
-// }
-// }
-// return md5OfReceiver;
-// }
-//
-//
-// @Override
-// public boolean load() {
-// try {
-// getFileNodeInfo();
-// loadData();
-// } catch (Exception e) {
-// logger.error("fail to load data", e);
-// return false;
-// }
-// return true;
-// }
-//
-// /**
-// * Get all tsfiles' info which are sent from sender, it is preparing for merging these data
-// */
-// public void getFileNodeInfo() throws IOException {
-// File dataFileRoot = new File(syncDataPath);
-// File[] files = dataFileRoot.listFiles();
-// int processedNum = 0;
-// for (File storageGroupPB : files) {
-// List<String> filesPath = new ArrayList<>();
-// File[] filesSG = storageGroupPB.listFiles();
-// for (File fileTF : filesSG) { // fileTF means TsFiles
-// Map<String, Long> startTimeMap = new HashMap<>();
-// Map<String, Long> endTimeMap = new HashMap<>();
-// TsFileSequenceReader reader = null;
-// try {
-// reader = new TsFileSequenceReader(fileTF.getPath());
-// Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-// Iterator<String> it = deviceIdMap.keySet().iterator();
-// while (it.hasNext()) {
-// String key = it.next();
-// TsDeviceMetadataIndex device = deviceIdMap.get(key);
-// startTimeMap.put(key, device.getStartTime());
-// endTimeMap.put(key, device.getEndTime());
-// }
-// } catch (IOException e) {
-// logger.error("Unable to read tsfile {}", fileTF.getPath());
-// throw new IOException(e);
-// } finally {
-// try {
-// if (reader != null) {
-// reader.close();
-// }
-// } catch (IOException e) {
-// logger.error("Cannot close tsfile stream {}", fileTF.getPath());
-// throw new IOException(e);
-// }
-// }
-// fileNodeStartTime.get().put(fileTF.getPath(), startTimeMap);
-// fileNodeEndTime.get().put(fileTF.getPath(), endTimeMap);
-// filesPath.add(fileTF.getPath());
-// processedNum++;
-// logger.info(String
-// .format("Get tsfile info has complete : %d/%d", processedNum, fileNum.get()));
-// fileNodeMap.get().put(storageGroupPB.getName(), filesPath);
-// }
-// }
-// }
-//
-//
-// /**
-// * It is to merge data. If data in the tsfile is new, append the tsfile to the storage group
-// * directly. If data in the tsfile is old, it has two strategy to merge.It depends on the
-// * possibility of updating historical data.
-// */
-// public void loadData() throws StorageEngineException {
-// syncDataPath = FilePathUtils.regularizePath(syncDataPath);
-// int processedNum = 0;
-// for (String storageGroup : fileNodeMap.get().keySet()) {
-// List<String> filesPath = fileNodeMap.get().get(storageGroup);
-// /** before load external tsFile, it is necessary to order files in the same storage group **/
-// Collections.sort(filesPath, (o1, o2) -> {
-// Map<String, Long> startTimePath1 = fileNodeStartTime.get().get(o1);
-// Map<String, Long> endTimePath2 = fileNodeEndTime.get().get(o2);
-// for (Entry<String, Long> entry : endTimePath2.entrySet()) {
-// if (startTimePath1.containsKey(entry.getKey())) {
-// if (startTimePath1.get(entry.getKey()) > entry.getValue()) {
-// return 1;
-// } else {
-// return -1;
-// }
-// }
-// }
-// return 0;
-// });
-//
-// for (String path : filesPath) {
-// // get startTimeMap and endTimeMap
-// Map<String, Long> startTimeMap = fileNodeStartTime.get().get(path);
-// Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path);
-//
-// // create a new fileNode
-// String header = syncDataPath;
-// String relativePath = path.substring(header.length());
-// TsFileResource fileNode = new TsFileResource(
-// new File(DirectoryManager.getInstance().getNextFolderIndexForSequenceFile() +
-// File.separator + relativePath), startTimeMap, endTimeMap
-// );
-// // call interface of load external file
-// try {
-// if (!STORAGE_GROUP_MANAGER.appendFileToStorageGroupProcessor(storageGroup, fileNode, path)) {
-// // it is a file with unsequence data
-// if (config.isUpdateHistoricalDataPossibility()) {
-// loadOldData(path);
-// } else {
-// List<String> overlapFiles = STORAGE_GROUP_MANAGER.getOverlapFiles(
-// storageGroup,
-// fileNode, uuid.get());
-// if (overlapFiles.isEmpty()) {
-// loadOldData(path);
-// } else {
-// loadOldData(path, overlapFiles);
-// }
-// }
-// }
-// } catch (StorageEngineException | IOException | ProcessorException e) {
-// logger.error("Can not load external file {}", path);
-// throw new StorageEngineException(e);
-// }
-// processedNum++;
-// logger.info(String
-// .format("Merging files has completed : %d/%d", processedNum, fileNum.get()));
-// }
-// }
-// }
-//
-// /**
-// * Insert all data in the tsfile into IoTDB.
-// */
-// public void loadOldData(String filePath) throws IOException, ProcessorException {
-// Set<String> timeseriesSet = new HashSet<>();
-// TsFileSequenceReader reader = null;
-// QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
-// try {
-// /** use tsfile reader to get data **/
-// reader = new TsFileSequenceReader(filePath);
-// Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-// Iterator<Entry<String, TsDeviceMetadataIndex>> entryIterator = deviceIdMap.entrySet()
-// .iterator();
-// while (entryIterator.hasNext()) {
-// Entry<String, TsDeviceMetadataIndex> deviceMIEntry = entryIterator.next();
-// String deviceId = deviceMIEntry.getKey();
-// TsDeviceMetadataIndex deviceMI = deviceMIEntry.getValue();
-// TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
-// List<ChunkGroupMetaData> rowGroupMetadataList = deviceMetadata.getChunkGroupMetaDataList();
-// timeseriesSet.clear();
-// /** firstly, get all timeseries in the same device **/
-// for (ChunkGroupMetaData chunkGroupMetaData : rowGroupMetadataList) {
-// List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData
-// .getChunkMetaDataList();
-// for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
-// String measurementUID = chunkMetaData.getMeasurementUid();
-// measurementUID = deviceId + "." + measurementUID;
-// timeseriesSet.add(measurementUID);
-// }
-// }
-// /** Secondly, use tsFile Reader to form InsertPlan **/
-// ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
-// List<Path> paths = new ArrayList<>();
-// paths.clear();
-// for (String timeseries : timeseriesSet) {
-// paths.add(new Path(timeseries));
-// }
-// QueryExpression queryExpression = QueryExpression.create(paths, null);
-// QueryDataSet queryDataSet = readTsFile.query(queryExpression);
-// while (queryDataSet.hasNext()) {
-// RowRecord record = queryDataSet.next();
-// List<Field> fields = record.getFields();
-// List<String> measurementList = new ArrayList<>();
-// List<String> insertValues = new ArrayList<>();
-// for (int i = 0; i < fields.size(); i++) {
-// Field field = fields.get(i);
-// if (!field.isNull()) {
-// measurementList.add(paths.get(i).getMeasurement());
-// if (fields.get(i).getDataType() == TSDataType.TEXT) {
-// insertValues.add(String.format("'%s'", field.toString()));
-// } else {
-// insertValues.add(String.format("%s", field.toString()));
-// }
-// }
-// }
-// if (insertExecutor.insert(new InsertPlan(deviceId, record.getTimestamp(),
-// measurementList.toArray(new String[0]), insertValues.toArray(new String[0])))) {
-// throw new IOException("Inserting series data to IoTDB engine has failed.");
-// }
-// }
-// }
-// } catch (IOException e) {
-// logger.error("Can not parse tsfile into SQL", e);
-// throw new IOException(e);
-// } catch (ProcessorException e) {
-// logger.error("Meet error while processing non-query.");
-// throw new ProcessorException(e);
-// } finally {
-// try {
-// if (reader != null) {
-// reader.close();
-// }
-// } catch (IOException e) {
-// logger.error("Cannot close file stream {}", filePath, e);
-// }
-// }
-// }
-//
-// /**
-// * Insert those valid data in the tsfile into IoTDB
-// *
-// * @param overlapFiles:files which are conflict with the sync file
-// */
-// public void loadOldData(String filePath, List<String> overlapFiles)
-// throws IOException, ProcessorException {
-// Set<String> timeseriesList = new HashSet<>();
-// QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
-// Map<String, ReadOnlyTsFile> tsfilesReaders = openReaders(filePath, overlapFiles);
-// try {
-// TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
-// Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-// Iterator<String> it = deviceIdMap.keySet().iterator();
-// while (it.hasNext()) {
-// String deviceID = it.next();
-// TsDeviceMetadataIndex deviceMI = deviceIdMap.get(deviceID);
-// TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
-// List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
-// .getChunkGroupMetaDataList();
-// timeseriesList.clear();
-// /** firstly, get all timeseries in the same device **/
-// for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
-// List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList();
-// for (ChunkMetaData timeSeriesChunkMetaData : chunkMetaDataList) {
-// String measurementUID = timeSeriesChunkMetaData.getMeasurementUid();
-// measurementUID = deviceID + "." + measurementUID;
-// timeseriesList.add(measurementUID);
-// }
-// }
-// reader.close();
-//
-// /** secondly, use tsFile Reader to form SQL **/
-// ReadOnlyTsFile readOnlyTsFile = tsfilesReaders.get(filePath);
-// List<Path> paths = new ArrayList<>();
-// /** compare data with one timeseries in a round to get valid data **/
-// for (String timeseries : timeseriesList) {
-// paths.clear();
-// paths.add(new Path(timeseries));
-// Set<InsertPlan> originDataPoints = new HashSet<>();
-// QueryExpression queryExpression = QueryExpression.create(paths, null);
-// QueryDataSet queryDataSet = readOnlyTsFile.query(queryExpression);
-// Set<InsertPlan> newDataPoints = convertToInserPlans(queryDataSet, paths, deviceID);
-//
-// /** get all data with the timeseries in all overlap files. **/
-// for (String overlapFile : overlapFiles) {
-// ReadOnlyTsFile readTsFileOverlap = tsfilesReaders.get(overlapFile);
-// QueryDataSet queryDataSetOverlap = readTsFileOverlap.query(queryExpression);
-// originDataPoints.addAll(convertToInserPlans(queryDataSetOverlap, paths, deviceID));
-// }
-//
-// /** If there has no overlap data with the timeseries, inserting all data in the sync file **/
-// if (originDataPoints.isEmpty()) {
-// for (InsertPlan insertPlan : newDataPoints) {
-// if (insertExecutor.insert(insertPlan)) {
-// throw new IOException("Inserting series data to IoTDB engine has failed.");
-// }
-// }
-// } else {
-// /** Compare every data to get valid data **/
-// for (InsertPlan insertPlan : newDataPoints) {
-// if (!originDataPoints.contains(insertPlan)) {
-// if (insertExecutor.insert(insertPlan)) {
-// throw new IOException("Inserting series data to IoTDB engine has failed.");
-// }
-// }
-// }
-// }
-// }
-// }
-// } catch (IOException e) {
-// logger.error("Can not parse tsfile into SQL", e);
-// throw new IOException(e);
-// } catch (ProcessorException e) {
-// logger.error("Meet error while processing non-query.", e);
-// throw new ProcessorException(e);
-// } finally {
-// try {
-// closeReaders(tsfilesReaders);
-// } catch (IOException e) {
-// logger.error("Cannot close file stream {}", filePath, e);
-// }
-// }
-// }
-//
-// private Set<InsertPlan> convertToInserPlans(QueryDataSet queryDataSet, List<Path> paths, String deviceID) throws IOException {
-// Set<InsertPlan> plans = new HashSet<>();
-// while (queryDataSet.hasNext()) {
-// RowRecord record = queryDataSet.next();
-// List<Field> fields = record.getFields();
-// /** get all data with the timeseries in the sync file **/
-// for (int i = 0; i < fields.size(); i++) {
-// Field field = fields.get(i);
-// String[] measurementList = new String[1];
-// if (!field.isNull()) {
-// measurementList[0] = paths.get(i).getMeasurement();
-// InsertPlan insertPlan = new InsertPlan(deviceID, record.getTimestamp(),
-// measurementList, new String[]{field.getDataType() == TSDataType.TEXT ? String.format("'%s'", field.toString())
-// : field.toString()});
-// plans.add(insertPlan);
-// }
-// }
-// }
-// return plans;
-// }
-//
-// /**
-// * Open all tsfile reader and cache
-// */
-// private Map<String, ReadOnlyTsFile> openReaders(String filePath, List<String> overlapFiles)
-// throws IOException {
-// Map<String, ReadOnlyTsFile> tsfileReaders = new HashMap<>();
-// tsfileReaders.put(filePath, new ReadOnlyTsFile(new TsFileSequenceReader(filePath)));
-// for (String overlapFile : overlapFiles) {
-// tsfileReaders.put(overlapFile, new ReadOnlyTsFile(new TsFileSequenceReader(overlapFile)));
-// }
-// return tsfileReaders;
-// }
-//
-// /**
-// * Close all tsfile reader
-// */
-// private void closeReaders(Map<String, ReadOnlyTsFile> readers) throws IOException {
-// for (ReadOnlyTsFile tsfileReader : readers.values()) {
-// tsfileReader.close();
-// }
-// }
-//
-// /**
-// * Release threadLocal variable resources
-// */
-// @Override
-// public void cleanUp() {
-// uuid.remove();
-// fileNum.remove();
-// fileNodeMap.remove();
-// fileNodeStartTime.remove();
-// fileNodeEndTime.remove();
-// schemaFromSenderPath.remove();
-// try {
-// FileUtils.deleteDirectory(new File(syncFolderPath));
-// } catch (IOException e) {
-// logger.error("can not delete directory {}", syncFolderPath, e);
-// }
-// logger.info("Synchronization has finished!");
-// }
-//
-// public Map<String, List<String>> getFileNodeMap() {
-// return fileNodeMap.get();
-// }
-//
-// public void setFileNodeMap(Map<String, List<String>> fileNodeMap) {
-// this.fileNodeMap.set(fileNodeMap);
-// }
-//
-//}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index 05b67c0..dfe3c52 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -30,9 +30,9 @@ public class Constans {
public static final String LOCK_FILE_NAME = "sync_lock";
public static final String SCHEMA_POS_FILE_NAME = "sync_schema_pos";
public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt";
+ public static final String CURRENT_LOCAL_FILE_NAME = "current_local_files.txt";
public static final String DATA_SNAPSHOT_NAME = "snapshot";
public static final String SYNC_LOG_NAME = "sync.log";
- public static final String CURRENT_SYNC_LOG_NAME = "current_sync.log";
public static final String MESSAGE_DIGIT_NAME = "MD5";
public static final String SYNC_DIR_NAME_SEPARATOR = "_";
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
index d4f7e33..1a529be 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
@@ -20,8 +20,6 @@ package org.apache.iotdb.db.sync.sender.manage;
import java.io.File;
import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
public interface ISyncFileManager {
@@ -30,6 +28,4 @@ public interface ISyncFileManager {
void getLastLocalFiles(File lastLocalFile) throws IOException;
void getValidFiles(String dataDir) throws IOException;
-
- void updateLastLocalFiles(File lastLocalFile, Set<String> localFiles);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
index 20f1dca..c08b64f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -125,15 +125,6 @@ public class SyncFileManager implements ISyncFileManager {
}
}
- @Override
- public void updateLastLocalFiles(File lastLocalFile, Set<String> localFiles) {
-
- }
-
- public Map<String, Set<File>> getCurrentSealedLocalFilesMap() {
- return currentSealedLocalFilesMap;
- }
-
public Map<String, Set<File>> getLastLocalFilesMap() {
return lastLocalFilesMap;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
index 3b33f78..53cb54f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
@@ -22,12 +22,12 @@ import java.util.Set;
public interface ISyncSenderLogAnalyzer {
- void loadLastLocalFiles(Set<String> set);
+ void loadLastLocalFiles(Set<String> lastLocalFiles);
- void loadLogger(Set<String> set);
+ void loadLogger(Set<String> deletedFiles, Set<String> newFiles);
void recover();
- void clearLogger();
+ void clearLogger(Set<String> currentLocalFiles);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
index 15df693..d1cdf9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
@@ -23,22 +23,14 @@ import java.io.IOException;
public interface ISyncSenderLogger {
- void startSync() throws IOException;
-
- void endSync() throws IOException;
-
void startSyncDeletedFilesName() throws IOException;
void finishSyncDeletedFileName(File file) throws IOException;
- void endSyncDeletedFilsName() throws IOException;
-
void startSyncTsFiles() throws IOException;
void finishSyncTsfile(File file) throws IOException;
- void endSyncTsFiles() throws IOException;
-
void close() throws IOException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
new file mode 100644
index 0000000..9649806
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.recover;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer{
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderLogAnalyzer.class);
+ private File currentLocalFile;
+ private File lastLocalFile;
+ private File syncLogFile;
+
+ public SyncSenderLogAnalyzer(String senderPath) {
+ this.currentLocalFile = new File(senderPath, Constans.CURRENT_LOCAL_FILE_NAME);
+ this.lastLocalFile = new File(senderPath, Constans.LAST_LOCAL_FILE_NAME);
+ this.syncLogFile = new File(senderPath, Constans.SYNC_LOG_NAME);
+ }
+
+ @Override
+ public void recover() {
+ if (currentLocalFile.exists() && !lastLocalFile.exists()) {
+ currentLocalFile.renameTo(lastLocalFile);
+ } else {
+ Set<String> lastLocalFiles = new HashSet<>();
+ Set<String> deletedFiles = new HashSet<>();
+ Set<String> newFiles = new HashSet<>();
+ loadLastLocalFiles(lastLocalFiles);
+ loadLogger(deletedFiles, newFiles);
+ lastLocalFiles.removeAll(deletedFiles);
+ lastLocalFiles.addAll(newFiles);
+ clearLogger(lastLocalFiles);
+ }
+ }
+
+ @Override
+ public void loadLastLocalFiles(Set<String> lastLocalFiles) {
+ try (BufferedReader br = new BufferedReader(new FileReader(lastLocalFile))) {
+ String line;
+ while ((line = br.readLine()) != null) {
+ lastLocalFiles.add(line);
+ }
+ } catch (IOException e) {
+ LOGGER
+ .error("Can not load last local file list from file {}", lastLocalFile.getAbsoluteFile(),
+ e);
+ }
+ }
+
+ @Override
+ public void loadLogger(Set<String> deletedFiles, Set<String> newFiles) {
+ try (BufferedReader br = new BufferedReader(new FileReader(syncLogFile))) {
+ String line;
+ int mode = 0;
+ while ((line = br.readLine()) != null) {
+ if (line.equals(SyncSenderLogger.SYNC_DELETED_FILE_NAME_START)) {
+ mode = -1;
+ } else if (line.equals(SyncSenderLogger.SYNC_TSFILE_START)) {
+ mode = 1;
+ } else {
+ if (mode == -1) {
+ deletedFiles.add(line);
+ } else if (mode == 1) {
+ newFiles.add(line);
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOGGER
+ .error("Can not load last local file list from file {}", lastLocalFile.getAbsoluteFile(),
+ e);
+ }
+ }
+
+ @Override
+ public void clearLogger(Set<String> currentLocalFiles) {
+ try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
+ for (String line : currentLocalFiles) {
+ bw.write(line);
+ bw.newLine();
+ }
+ bw.flush();
+ } catch (IOException e) {
+ LOGGER.error("Can not clear sync log {}", syncLogFile.getAbsoluteFile(), e);
+ }
+ currentLocalFile.renameTo(lastLocalFile);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
index 8171d0f..8e118d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.sync.sender.recover;
import java.io.BufferedWriter;
@@ -7,12 +25,8 @@ import java.io.IOException;
public class SyncSenderLogger implements ISyncSenderLogger {
- public static final String SYNC_START = "sync start";
- public static final String SYNC_END = "sync end";
public static final String SYNC_DELETED_FILE_NAME_START = "sync deleted file names start";
- public static final String SYNC_DELETED_FILE_NAME_END = "sync deleted file names end";
public static final String SYNC_TSFILE_START = "sync tsfile start";
- public static final String SYNC_TSFILE_END = "sync tsfile end";
private BufferedWriter bw;
public SyncSenderLogger(String filePath) throws IOException {
@@ -24,20 +38,6 @@ public class SyncSenderLogger implements ISyncSenderLogger {
}
@Override
- public void startSync() throws IOException {
- bw.write(SYNC_START);
- bw.newLine();
- bw.flush();
- }
-
- @Override
- public void endSync() throws IOException {
- bw.write(SYNC_END);
- bw.newLine();
- bw.flush();
- }
-
- @Override
public void startSyncDeletedFilesName() throws IOException {
bw.write(SYNC_DELETED_FILE_NAME_START);
bw.newLine();
@@ -52,13 +52,6 @@ public class SyncSenderLogger implements ISyncSenderLogger {
}
@Override
- public void endSyncDeletedFilsName() throws IOException {
- bw.write(SYNC_DELETED_FILE_NAME_END);
- bw.newLine();
- bw.flush();
- }
-
- @Override
public void startSyncTsFiles() throws IOException {
bw.write(SYNC_TSFILE_START);
bw.newLine();
@@ -73,13 +66,6 @@ public class SyncSenderLogger implements ISyncSenderLogger {
}
@Override
- public void endSyncTsFiles() throws IOException {
- bw.write(SYNC_TSFILE_END);
- bw.newLine();
- bw.flush();
- }
-
- @Override
public void close() throws IOException {
bw.close();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index c616376..fc68960 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -1,16 +1,4 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with the License. You may obtain
- * a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
- * governing permissions and limitations under the License.
- */
+
package org.apache.iotdb.db.sync.sender.transfer;
import java.io.BufferedReader;
@@ -18,8 +6,6 @@ import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
@@ -33,12 +19,10 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
@@ -50,10 +34,10 @@ import org.apache.iotdb.db.sync.sender.conf.Constans;
import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
import org.apache.iotdb.db.sync.sender.manage.SyncFileManager;
+import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
import org.apache.iotdb.db.utils.SyncUtils;
import org.apache.iotdb.service.sync.thrift.ResultStatus;
-import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.thrift.TException;
@@ -91,7 +75,7 @@ public class DataTransferManager implements IDataTransferManager {
private Map<String, Set<File>> sucessSyncedFilesMap;
- private Map<String, Set<File>> successDeleyedFilesMap;
+ private Map<String, Set<File>> successDeletedFilesMap;
private Map<String, Set<File>> lastLocalFilesMap;
@@ -102,11 +86,6 @@ public class DataTransferManager implements IDataTransferManager {
private SyncSenderLogger syncLog;
- /**
- * Key means storage group, Set means corresponding tsfiles
- **/
- private Map<String, Set<String>> validFileSnapshot = new HashMap<>();
-
private SyncFileManager syncFileManager = SyncFileManager.getInstance();
private ScheduledExecutorService executorService;
@@ -130,6 +109,53 @@ public class DataTransferManager implements IDataTransferManager {
fileSenderImpl.startTimedTask();
}
+
+ /**
+ * The method is to verify whether the client lock file is locked or not, ensuring that only one
+ * client is running.
+ */
+ private void verifySingleton() throws IOException {
+ File lockFile = new File(config.getLockFilePath());
+ if (!lockFile.getParentFile().exists()) {
+ lockFile.getParentFile().mkdirs();
+ }
+ if (!lockFile.exists()) {
+ lockFile.createNewFile();
+ }
+ if (!lockInstance(config.getLockFilePath())) {
+ logger.error("Sync client is already running.");
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Try to lock lockfile. if failed, it means that sync client has benn started.
+ *
+ * @param lockFile path of lock file
+ */
+ private boolean lockInstance(final String lockFile) {
+ try {
+ final File file = new File(lockFile);
+ final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+ final FileLock fileLock = randomAccessFile.getChannel().tryLock();
+ if (fileLock != null) {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ fileLock.release();
+ randomAccessFile.close();
+ } catch (Exception e) {
+ logger.error("Unable to remove lock file: {}", lockFile, e);
+ }
+ }));
+ return true;
+ }
+ } catch (Exception e) {
+ logger.error("Unable to create and/or lock file: {}", lockFile, e);
+ }
+ return false;
+ }
+
+
@Override
public void init() {
if (executorService == null) {
@@ -221,6 +247,8 @@ public class DataTransferManager implements IDataTransferManager {
// 1. Sync data
for (Entry<String, Set<File>> entry : deletedFilesMap.entrySet()) {
+ checkRecovery();
+ syncLog = new SyncSenderLogger(getSyncLogFile());
// TODO deal with the situation
try {
if (serviceClient.init(entry.getKey()) == ResultStatus.FAILURE) {
@@ -232,12 +260,10 @@ public class DataTransferManager implements IDataTransferManager {
logger.info("Sync process starts to transfer data of storage group {}", entry.getKey());
syncDeletedFilesName(entry.getKey(), entry.getValue());
syncDataFilesInOneGroup(entry.getKey(), entry.getValue());
+ clearSyncLog();
}
- // 2. Clear sync log
- clearSyncLog();
-
- } catch (SyncConnectionException | TException e) {
+ } catch (SyncConnectionException e) {
logger.error("cannot finish sync process", e);
} finally {
if (syncLog != null) {
@@ -248,23 +274,22 @@ public class DataTransferManager implements IDataTransferManager {
}
private void checkRecovery() {
-
- }
-
- public void clearSyncLog() {
-
+ new SyncSenderLogAnalyzer(config.getSenderPath()).recover();
}
@Override
- public void syncDeletedFilesName(String sgName, Set<File> deletedFilesName) {
+ public void syncDeletedFilesName(String sgName, Set<File> deletedFilesName) throws IOException {
if (deletedFilesName.isEmpty()) {
logger.info("There has no deleted files to be synced in storage group {}", sgName);
return;
}
+ syncLog.startSyncDeletedFilesName();
logger.info("Start to sync names of deleted files in storage group {}", sgName);
- for(File file:deletedFilesName){
+ for (File file : deletedFilesName) {
try {
serviceClient.syncDeletedFileName(file.getName());
+ successDeletedFilesMap.get(sgName).add(file);
+ syncLog.finishSyncDeletedFileName(file);
} catch (TException e) {
logger.error("Can not sync deleted file name {}, skip it.", file);
}
@@ -274,27 +299,29 @@ public class DataTransferManager implements IDataTransferManager {
@Override
public void syncDataFilesInOneGroup(String sgName, Set<File> toBeSyncFiles)
- throws SyncConnectionException {
- Set<String> validSnapshot = validFileSnapshot.get(sgName);
- if (validSnapshot.isEmpty()) {
+ throws SyncConnectionException, IOException {
+ if (toBeSyncFiles.isEmpty()) {
logger.info("There has no new tsfiles to be synced in storage group {}", sgName);
return;
}
+ syncLog.startSyncTsFiles();
logger.info("Sync process starts to transfer data of storage group {}", sgName);
int cnt = 0;
- for (String tsfilePath : toBeSyncFiles) {
+ for (File tsfile : toBeSyncFiles) {
cnt++;
File snapshotFile = null;
try {
- snapshotFile = makeFileSnapshot(tsfilePath);
+ snapshotFile = makeFileSnapshot(tsfile);
syncSingleFile(snapshotFile);
+ sucessSyncedFilesMap.get(sgName).add(tsfile);
+ syncLog.finishSyncTsfile(tsfile);
logger.info("Task of synchronization has completed {}/{}.", cnt, toBeSyncFiles.size());
} catch (IOException e) {
logger.info(
"Tsfile {} can not make snapshot, so skip the tsfile and continue to sync other tsfiles",
- tsfilePath, e);
+ tsfile, e);
} finally {
- if(snapshotFile != null) {
+ if (snapshotFile != null) {
snapshotFile.deleteOnExit();
}
}
@@ -302,27 +329,25 @@ public class DataTransferManager implements IDataTransferManager {
logger.info("Sync process has finished storage group {}.", sgName);
}
- private File makeFileSnapshot(String filePath) throws IOException {
- String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath);
- File newFile = new File(snapshotFilePath);
- if (!newFile.getParentFile().exists()) {
- newFile.getParentFile().mkdirs();
+ private File makeFileSnapshot(File file) throws IOException {
+ File snapshotFile = SyncUtils.getSnapshotFile(file);
+ if (!snapshotFile.getParentFile().exists()) {
+ snapshotFile.getParentFile().mkdirs();
}
- Path link = FileSystems.getDefault().getPath(snapshotFilePath);
- Path target = FileSystems.getDefault().getPath(filePath);
+ Path link = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
+ Path target = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
Files.createLink(link, target);
- return newFile;
+ return snapshotFile;
}
-
/**
* Transfer data of a storage group to receiver.
- *
*/
private void syncSingleFile(File snapshotFile) throws SyncConnectionException {
try {
int retryCount = 0;
MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
+ serviceClient.initSyncData(snapshotFile.getName());
outer:
while (true) {
retryCount++;
@@ -341,9 +366,7 @@ public class DataTransferManager implements IDataTransferManager {
md.update(buffer, 0, dataLength);
ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
bos.reset();
- if (!Boolean.parseBoolean(serviceClient
- .syncData(null, snapshotFile.getName(), buffToSend,
- SyncDataStatus.PROCESSING_STATUS))) {
+ if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
logger.info("Receiver failed to receive data from {}, retry.",
snapshotFile.getAbsoluteFile());
continue outer;
@@ -353,14 +376,13 @@ public class DataTransferManager implements IDataTransferManager {
// the file is sent successfully
String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
- String md5OfReceiver = serviceClient.syncData(md5OfSender, snapshotFile.getName(),
- null, SyncDataStatus.FINISH_STATUS);
+ String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
if (md5OfSender.equals(md5OfReceiver)) {
logger.info("Receiver has received {} successfully.", snapshotFile.getAbsoluteFile());
break;
}
}
- } catch (IOException e) {
+ } catch (IOException | TException | NoSuchAlgorithmException e) {
throw new SyncConnectionException("Cannot sync data with receiver.", e);
}
}
@@ -389,67 +411,14 @@ public class DataTransferManager implements IDataTransferManager {
* UUID marks the identity of sender for receiver.
*/
@Override
- public boolean confirmIdentity(String uuidPath) throws SyncConnectionException, IOException {
- File file = new File(uuidPath);
- /** Mark the identity of sender **/
- String uuid;
- if (!file.getParentFile().exists()) {
- file.getParentFile().mkdirs();
- }
- if (!file.exists()) {
- try (FileOutputStream out = new FileOutputStream(file)) {
- file.createNewFile();
- uuid = generateUUID();
- out.write(uuid.getBytes());
- } catch (IOException e) {
- logger.error("Cannot insert UUID to file {}", file.getPath());
- throw new IOException(e);
- }
- } else {
- try (BufferedReader bf = new BufferedReader((new FileReader(uuidPath)))) {
- uuid = bf.readLine();
- } catch (IOException e) {
- logger.error("Cannot read UUID from file{}", file.getPath());
- throw new IOException(e);
- }
- }
- boolean legalConnection;
+ public boolean confirmIdentity(String uuidPath) throws SyncConnectionException {
try {
- legalConnection = serviceClient.checkIdentity(uuid,
- InetAddress.getLocalHost().getHostAddress());
+ return serviceClient.checkIdentity(InetAddress.getLocalHost().getHostAddress())
+ == ResultStatus.SUCCESS;
} catch (Exception e) {
logger.error("Cannot confirm identity with receiver");
throw new SyncConnectionException(e);
}
- return legalConnection;
- }
-
- private String generateUUID() {
- return Constans.SYNC_SENDER + UUID.randomUUID().toString().replaceAll("-", "");
- }
-
- /**
- * Create snapshots for valid files.
- */
- @Override
- public Set<String> makeFileSnapshot(Set<String> validFiles) {
- Set<String> validFilesSnapshot = new HashSet<>();
- for (String filePath : validFiles) {
- try {
- String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath);
- File newFile = new File(snapshotFilePath);
- if (!newFile.getParentFile().exists()) {
- newFile.getParentFile().mkdirs();
- }
- Path link = FileSystems.getDefault().getPath(snapshotFilePath);
- Path target = FileSystems.getDefault().getPath(filePath);
- Files.createLink(link, target);
- validFilesSnapshot.add(snapshotFilePath);
- } catch (IOException e) {
- logger.error("Can not make snapshot for file {}", filePath);
- }
- }
- return validFilesSnapshot;
}
/**
@@ -521,14 +490,6 @@ public class DataTransferManager implements IDataTransferManager {
}
}
- private File getSchemaPosFile() {
- return new File(config.getSenderPath(), Constans.SCHEMA_POS_FILE_NAME)
- }
-
- private File getSchemaLogFile() {
- return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(),
- MetadataConstant.METADATA_LOG);
- }
private boolean checkMD5ForSchema(String md5OfSender) throws TException {
String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
@@ -536,7 +497,8 @@ public class DataTransferManager implements IDataTransferManager {
logger.info("Receiver has received schema successfully, retry.");
return true;
} else {
- logger.error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
+ logger
+ .error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
return false;
}
}
@@ -566,49 +528,37 @@ public class DataTransferManager implements IDataTransferManager {
}
}
- /**
- * The method is to verify whether the client lock file is locked or not, ensuring that only one
- * client is running.
- */
- private void verifySingleton() throws IOException {
- File lockFile = new File(config.getLockFilePath());
- if (!lockFile.getParentFile().exists()) {
- lockFile.getParentFile().mkdirs();
- }
- if (!lockFile.exists()) {
- lockFile.createNewFile();
+ private void clearSyncLog() {
+ for (Entry<String, Set<File>> entry : lastLocalFilesMap.entrySet()) {
+ entry.getValue()
+ .removeAll(successDeletedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
+ entry.getValue()
+ .removeAll(sucessSyncedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
}
- if (!lockInstance(config.getLockFilePath())) {
- logger.error("Sync client is already running.");
- System.exit(1);
+ File currentLocalFile = getCurrentLogFile();
+ File lastLocalFile = new File(config.getLastFileInfo());
+ try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
+ for (Set<File> currentLocalFiles : lastLocalFilesMap.values()) {
+ for (File file : currentLocalFiles) {
+ bw.write(file.getAbsolutePath());
+ bw.newLine();
+ }
+ bw.flush();
+ }
+ } catch (IOException e) {
+ logger.error("Can not clear sync log {}", lastLocalFile.getAbsoluteFile(), e);
}
+ currentLocalFile.renameTo(lastLocalFile);
}
- /**
- * Try to lock lockfile. if failed, it means that sync client has benn started.
- *
- * @param lockFile path of lock file
- */
- private boolean lockInstance(final String lockFile) {
- try {
- final File file = new File(lockFile);
- final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
- final FileLock fileLock = randomAccessFile.getChannel().tryLock();
- if (fileLock != null) {
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- fileLock.release();
- randomAccessFile.close();
- } catch (Exception e) {
- logger.error("Unable to remove lock file: {}", lockFile, e);
- }
- }));
- return true;
- }
- } catch (Exception e) {
- logger.error("Unable to create and/or lock file: {}", lockFile, e);
- }
- return false;
+
+ private File getSchemaPosFile() {
+ return new File(config.getSenderPath(), Constans.SCHEMA_POS_FILE_NAME);
+ }
+
+ private File getSchemaLogFile() {
+ return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(),
+ MetadataConstant.METADATA_LOG);
}
private static class InstanceHolder {
@@ -621,7 +571,7 @@ public class DataTransferManager implements IDataTransferManager {
}
private File getCurrentLogFile() {
- return new File(config.getSenderPath(), Constans.CURRENT_SYNC_LOG_NAME);
+ return new File(config.getSenderPath(), Constans.CURRENT_LOCAL_FILE_NAME);
}
public void setConfig(SyncSenderConfig config) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
index 82b0d00..3366b91 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
@@ -45,22 +45,18 @@ public interface IDataTransferManager {
boolean confirmIdentity(String uuidPath) throws SyncConnectionException, IOException;
/**
- * Make file snapshots before sending files.
- */
- Set<String> makeFileSnapshot(Set<String> validFiles);
-
- /**
* Send schema file to receiver.
*/
void syncSchema() throws SyncConnectionException, TException;
void syncDeletedFilesName(String sgName, Set<File> deletedFilesName)
- throws SyncConnectionException;
+ throws SyncConnectionException, IOException;
/**
* For all valid files, send it to receiver side and load these data in receiver.
*/
- void syncDataFilesInOneGroup(String sgName, Set<File> deletedFilesName) throws SyncConnectionException;
+ void syncDataFilesInOneGroup(String sgName, Set<File> deletedFilesName)
+ throws SyncConnectionException, IOException;
/**
* Execute a sync task.
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
index 8e7a9e8..13247cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
@@ -37,14 +37,13 @@ public class SyncUtils {
* multiple directories, it's necessary to make a snapshot in the same disk. It's used by sync
* sender.
*/
- public static String getSnapshotFilePath(String filePath) {
- String relativeFilePath =
- new File(filePath).getParent() + File.separator + new File(filePath).getName();
+ public static File getSnapshotFile(File file) {
+ String relativeFilePath = file.getParent() + File.separator + file.getName();
String snapshotDir = SyncSenderDescriptor.getInstance().getConfig().getSnapshotPath();
if (!new File(snapshotDir).exists()) {
new File(snapshotDir).mkdirs();
}
- return new File(snapshotDir, relativeFilePath).getAbsolutePath();
+ return new File(snapshotDir, relativeFilePath);
}
/**