You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2020/03/17 11:59:14 UTC

[incubator-iotdb] branch fix_add_duplicated_metadata_bug created (now da4ba74)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch fix_add_duplicated_metadata_bug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at da4ba74  fix bug of add duplicated bugs

This branch includes the following new commits:

     new da4ba74  fix bug of add duplicated bugs

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: fix bug of add duplicated bugs

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch fix_add_duplicated_metadata_bug
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit da4ba74377c850230b28a0cbf59d3526ebbb48fb
Author: lta <li...@163.com>
AuthorDate: Tue Mar 17 19:58:51 2020 +0800

    fix bug of add duplicated bugs
---
 .../engine/storagegroup/StorageGroupProcessor.java | 17 +++++++++++++---
 .../org/apache/iotdb/db/metadata/MManager.java     |  4 ++++
 .../java/org/apache/iotdb/db/metadata/MTree.java   |  3 +++
 .../db/sync/receiver/transfer/SyncServiceImpl.java | 16 ++++++++++++++-
 .../db/sync/sender/manage/SyncFileManager.java     | 10 ----------
 .../iotdb/db/sync/sender/transfer/SyncClient.java  |  9 +++++++--
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |  1 +
 service-rpc/src/main/thrift/sync.thrift            | 23 +++++++++++++++++-----
 8 files changed, 62 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 674c0fd..6642ae1 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1537,8 +1537,9 @@ public class StorageGroupProcessor {
     writeLock();
     mergeLock.writeLock().lock();
     try {
-      loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource);
-      updateLatestTimeMap(newTsFileResource);
+      if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource)){
+        updateLatestTimeMap(newTsFileResource);
+      }
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -1780,8 +1781,9 @@ public class StorageGroupProcessor {
    * @param type load type
    * @param tsFileResource tsfile resource to be loaded
    * @UsedBy sync module, load external tsfile module.
+   * @return load the file successfully
    */
-  private void loadTsFileByType(LoadTsFileType type, File syncedTsFile,
+  private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
       TsFileResource tsFileResource)
       throws TsFileProcessorException, DiskSpaceInsufficientException {
     File targetFile;
@@ -1793,6 +1795,10 @@ public class StorageGroupProcessor {
             storageGroupName + File.separatorChar + timeRangeId + File.separator + tsFileResource
                 .getFile().getName());
         tsFileResource.setFile(targetFile);
+        if(unSequenceFileList.contains(tsFileResource)){
+          logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
+          return false;
+        }
         unSequenceFileList.add(tsFileResource);
         logger.info("Load tsfile in unsequence list, move file from {} to {}",
             syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
@@ -1803,6 +1809,10 @@ public class StorageGroupProcessor {
                 storageGroupName + File.separatorChar + timeRangeId + File.separator
                     + tsFileResource.getFile().getName());
         tsFileResource.setFile(targetFile);
+        if(sequenceFileTreeSet.contains(tsFileResource)){
+          logger.error("The file {} has already been loaded in sequence list", tsFileResource);
+          return false;
+        }
         sequenceFileTreeSet.add(tsFileResource);
         logger.info("Load tsfile in sequence list, move file from {} to {}",
             syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
@@ -1840,6 +1850,7 @@ public class StorageGroupProcessor {
           syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(),
           e.getMessage()));
     }
+    return true;
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index afd72f3..c20032c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -451,6 +451,10 @@ public class MManager {
    * @param storageGroup root.node.(node)*
    */
   public void setStorageGroup(String storageGroup) throws MetadataException {
+    if (mtree.checkStorageGroupByPath(storageGroup)) {
+      throw new MetadataException(
+          String.format("Storage group <%s> has already existed.", storageGroup));
+    }
     lock.writeLock().lock();
     try {
       if (writeToLog) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 4869b70..896b821 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -102,6 +102,9 @@ public class MTree implements Serializable {
     }
     MNode leaf = new LeafMNode(cur, nodeNames[nodeNames.length - 1], dataType, encoding,
         compressor, props);
+    if (cur.hasChild(leaf.getName())) {
+      throw new MetadataException(String.format("The timeseries %s has already existed.", path));
+    }
     cur.addChild(leaf);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 6096a53..169a689 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -29,6 +29,7 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -45,6 +46,7 @@ import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
 import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogger;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.service.sync.thrift.SyncStatus;
 import org.apache.thrift.TException;
@@ -75,8 +77,20 @@ public class SyncServiceImpl implements SyncService.Iface {
    * Verify IP address of sender
    */
   @Override
-  public SyncStatus check(String ipAddress, String uuid) {
+  public SyncStatus check(ConfirmInfo info) {
+    String ipAddress = info.address, uuid = info.uuid;
     Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
+    if (!info.version.equals(IoTDBConstant.VERSION)) {
+      return getErrorResult(String.format("Version mismatch: the sender <%s>, the receiver <%s>",
+          info.version, IoTDBConstant.VERSION));
+    }
+    if (info.partitionInterval != IoTDBDescriptor.getInstance().getConfig()
+        .getPartitionInterval()) {
+      return getErrorResult(String
+          .format("Partition interval mismatch: the sender <%d>, the receiver <%d>",
+              info.partitionInterval,
+              IoTDBDescriptor.getInstance().getConfig().getPartitionInterval()));
+    }
     if (SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress)) {
       senderName.set(ipAddress + SyncConstant.SYNC_DIR_NAME_SEPARATOR + uuid);
       if (checkRecovery()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
index 6c4c569..d6e360e 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -102,16 +102,6 @@ public class SyncFileManager implements ISyncFileManager {
           .equals(TsFileConstant.PATH_UPGRADE)) {
         continue;
       }
-      try {
-        if (!MManager.getInstance().getStorageGroupName(sgFolder.getName())
-            .equals(sgFolder.getName())) {
-          // the folder is not a sg folder
-          continue;
-        }
-      } catch (MetadataException e) {
-        // the folder is not a sg folder
-        continue;
-      }
       allSGs.putIfAbsent(sgFolder.getName(), new HashSet<>());
       currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashMap<>());
       for (File timeRangeFolder : sgFolder.listFiles()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 653c6c2..e627e43 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.SyncConnectionException;
@@ -67,6 +68,7 @@ import org.apache.iotdb.db.sync.sender.recover.ISyncSenderLogger;
 import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
 import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
 import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.service.sync.thrift.SyncStatus;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
@@ -286,9 +288,12 @@ public class SyncClient implements ISyncClient {
 
   @Override
   public void confirmIdentity() throws SyncConnectionException {
-    try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())){
+    try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())) {
+      ConfirmInfo info = new ConfirmInfo(socket.getLocalAddress().getHostAddress(),
+          getOrCreateUUID(getUuidFile()),
+          IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(), IoTDBConstant.VERSION);
       SyncStatus status = serviceClient
-          .check(socket.getLocalAddress().getHostAddress(), getOrCreateUUID(getUuidFile()));
+          .check(info);
       if (status.code != SUCCESS_CODE) {
         throw new SyncConnectionException(
             "The receiver rejected the synchronization task because " + status.msg);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 765c17a..64f04c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -55,6 +55,7 @@ public class FileLoaderUtils {
     } else {
       tsFileResource.deserialize();
     }
+    tsFileResource.setClosed(true);
   }
 
   public static void updateTsFileResource(TsFileMetaData metaData, TsFileSequenceReader reader,
diff --git a/service-rpc/src/main/thrift/sync.thrift b/service-rpc/src/main/thrift/sync.thrift
index 9b20673..3e243f4 100755
--- a/service-rpc/src/main/thrift/sync.thrift
+++ b/service-rpc/src/main/thrift/sync.thrift
@@ -18,15 +18,28 @@
  */
 namespace java org.apache.iotdb.service.sync.thrift
 
-typedef i32 int 
-
 struct SyncStatus{
-  required i32 code
-  required string msg
+  1:required i32 code
+  2:required string msg
+}
+
+// The sender and receiver need to check some info to confirm validity
+struct ConfirmInfo{
+  // check whether the ip of sender is in thw white list of receiver.
+  1:string address
+
+  // Sender needs to tell receiver its identity.
+  2:string uuid
+
+  // The partition interval of sender and receiver need to be the same.
+  3:i64 partitionInterval
+
+  // The version of sender and receiver need to be the same.
+  4:string version
 }
 
 service SyncService{
-	SyncStatus check(1:string address, 2:string uuid)
+	SyncStatus check(ConfirmInfo info)
 	SyncStatus startSync();
 	SyncStatus init(1:string storageGroupName)
 	SyncStatus syncDeletedFileName(1:string fileName)