You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2020/03/17 11:59:15 UTC
[incubator-iotdb] 01/01: fix bug of add duplicated bugs
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch fix_add_duplicated_metadata_bug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit da4ba74377c850230b28a0cbf59d3526ebbb48fb
Author: lta <li...@163.com>
AuthorDate: Tue Mar 17 19:58:51 2020 +0800
fix bug of add duplicated bugs
---
.../engine/storagegroup/StorageGroupProcessor.java | 17 +++++++++++++---
.../org/apache/iotdb/db/metadata/MManager.java | 4 ++++
.../java/org/apache/iotdb/db/metadata/MTree.java | 3 +++
.../db/sync/receiver/transfer/SyncServiceImpl.java | 16 ++++++++++++++-
.../db/sync/sender/manage/SyncFileManager.java | 10 ----------
.../iotdb/db/sync/sender/transfer/SyncClient.java | 9 +++++++--
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 1 +
service-rpc/src/main/thrift/sync.thrift | 23 +++++++++++++++++-----
8 files changed, 62 insertions(+), 21 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 674c0fd..6642ae1 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1537,8 +1537,9 @@ public class StorageGroupProcessor {
writeLock();
mergeLock.writeLock().lock();
try {
- loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource);
- updateLatestTimeMap(newTsFileResource);
+ if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource)){
+ updateLatestTimeMap(newTsFileResource);
+ }
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -1780,8 +1781,9 @@ public class StorageGroupProcessor {
* @param type load type
* @param tsFileResource tsfile resource to be loaded
* @UsedBy sync module, load external tsfile module.
+ * @return load the file successfully
*/
- private void loadTsFileByType(LoadTsFileType type, File syncedTsFile,
+ private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
TsFileResource tsFileResource)
throws TsFileProcessorException, DiskSpaceInsufficientException {
File targetFile;
@@ -1793,6 +1795,10 @@ public class StorageGroupProcessor {
storageGroupName + File.separatorChar + timeRangeId + File.separator + tsFileResource
.getFile().getName());
tsFileResource.setFile(targetFile);
+ if(unSequenceFileList.contains(tsFileResource)){
+ logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
+ return false;
+ }
unSequenceFileList.add(tsFileResource);
logger.info("Load tsfile in unsequence list, move file from {} to {}",
syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
@@ -1803,6 +1809,10 @@ public class StorageGroupProcessor {
storageGroupName + File.separatorChar + timeRangeId + File.separator
+ tsFileResource.getFile().getName());
tsFileResource.setFile(targetFile);
+ if(sequenceFileTreeSet.contains(tsFileResource)){
+ logger.error("The file {} has already been loaded in sequence list", tsFileResource);
+ return false;
+ }
sequenceFileTreeSet.add(tsFileResource);
logger.info("Load tsfile in sequence list, move file from {} to {}",
syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
@@ -1840,6 +1850,7 @@ public class StorageGroupProcessor {
syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(),
e.getMessage()));
}
+ return true;
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index afd72f3..c20032c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -451,6 +451,10 @@ public class MManager {
* @param storageGroup root.node.(node)*
*/
public void setStorageGroup(String storageGroup) throws MetadataException {
+ if (mtree.checkStorageGroupByPath(storageGroup)) {
+ throw new MetadataException(
+ String.format("Storage group <%s> has already existed.", storageGroup));
+ }
lock.writeLock().lock();
try {
if (writeToLog) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 4869b70..896b821 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -102,6 +102,9 @@ public class MTree implements Serializable {
}
MNode leaf = new LeafMNode(cur, nodeNames[nodeNames.length - 1], dataType, encoding,
compressor, props);
+ if (cur.hasChild(leaf.getName())) {
+ throw new MetadataException(String.format("The timeseries %s has already existed.", path));
+ }
cur.addChild(leaf);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 6096a53..169a689 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -29,6 +29,7 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -45,6 +46,7 @@ import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogger;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncStatus;
import org.apache.thrift.TException;
@@ -75,8 +77,20 @@ public class SyncServiceImpl implements SyncService.Iface {
* Verify IP address of sender
*/
@Override
- public SyncStatus check(String ipAddress, String uuid) {
+ public SyncStatus check(ConfirmInfo info) {
+ String ipAddress = info.address, uuid = info.uuid;
Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
+ if (!info.version.equals(IoTDBConstant.VERSION)) {
+ return getErrorResult(String.format("Version mismatch: the sender <%s>, the receiver <%s>",
+ info.version, IoTDBConstant.VERSION));
+ }
+ if (info.partitionInterval != IoTDBDescriptor.getInstance().getConfig()
+ .getPartitionInterval()) {
+ return getErrorResult(String
+ .format("Partition interval mismatch: the sender <%d>, the receiver <%d>",
+ info.partitionInterval,
+ IoTDBDescriptor.getInstance().getConfig().getPartitionInterval()));
+ }
if (SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress)) {
senderName.set(ipAddress + SyncConstant.SYNC_DIR_NAME_SEPARATOR + uuid);
if (checkRecovery()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
index 6c4c569..d6e360e 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -102,16 +102,6 @@ public class SyncFileManager implements ISyncFileManager {
.equals(TsFileConstant.PATH_UPGRADE)) {
continue;
}
- try {
- if (!MManager.getInstance().getStorageGroupName(sgFolder.getName())
- .equals(sgFolder.getName())) {
- // the folder is not a sg folder
- continue;
- }
- } catch (MetadataException e) {
- // the folder is not a sg folder
- continue;
- }
allSGs.putIfAbsent(sgFolder.getName(), new HashSet<>());
currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashMap<>());
for (File timeRangeFolder : sgFolder.listFiles()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 653c6c2..e627e43 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.SyncConnectionException;
@@ -67,6 +68,7 @@ import org.apache.iotdb.db.sync.sender.recover.ISyncSenderLogger;
import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncStatus;
import org.apache.iotdb.tsfile.utils.BytesUtils;
@@ -286,9 +288,12 @@ public class SyncClient implements ISyncClient {
@Override
public void confirmIdentity() throws SyncConnectionException {
- try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())){
+ try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())) {
+ ConfirmInfo info = new ConfirmInfo(socket.getLocalAddress().getHostAddress(),
+ getOrCreateUUID(getUuidFile()),
+ IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(), IoTDBConstant.VERSION);
SyncStatus status = serviceClient
- .check(socket.getLocalAddress().getHostAddress(), getOrCreateUUID(getUuidFile()));
+ .check(info);
if (status.code != SUCCESS_CODE) {
throw new SyncConnectionException(
"The receiver rejected the synchronization task because " + status.msg);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 765c17a..64f04c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -55,6 +55,7 @@ public class FileLoaderUtils {
} else {
tsFileResource.deserialize();
}
+ tsFileResource.setClosed(true);
}
public static void updateTsFileResource(TsFileMetaData metaData, TsFileSequenceReader reader,
diff --git a/service-rpc/src/main/thrift/sync.thrift b/service-rpc/src/main/thrift/sync.thrift
index 9b20673..3e243f4 100755
--- a/service-rpc/src/main/thrift/sync.thrift
+++ b/service-rpc/src/main/thrift/sync.thrift
@@ -18,15 +18,28 @@
*/
namespace java org.apache.iotdb.service.sync.thrift
-typedef i32 int
-
struct SyncStatus{
- required i32 code
- required string msg
+ 1:required i32 code
+ 2:required string msg
+}
+
+// The sender and receiver need to check some info to confirm validity
+struct ConfirmInfo{
+ // check whether the ip of sender is in thw white list of receiver.
+ 1:string address
+
+ // Sender needs to tell receiver its identity.
+ 2:string uuid
+
+ // The partition interval of sender and receiver need to be the same.
+ 3:i64 partitionInterval
+
+ // The version of sender and receiver need to be the same.
+ 4:string version
}
service SyncService{
- SyncStatus check(1:string address, 2:string uuid)
+ SyncStatus check(ConfirmInfo info)
SyncStatus startSync();
SyncStatus init(1:string storageGroupName)
SyncStatus syncDeletedFileName(1:string fileName)