You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/17 15:39:30 UTC

[incubator-iotdb] branch master updated: Fix bugs of add duplicated metadata (#918)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a92b80c  Fix bugs of add duplicated metadata (#918)
a92b80c is described below

commit a92b80cdd72f030b90b4d150bfd9fedc8b1e4a32
Author: Tianan Li <li...@163.com>
AuthorDate: Tue Mar 17 23:39:23 2020 +0800

    Fix bugs of add duplicated metadata (#918)
    
    * fix bug of add error mlog then recover an empty MTree
---
 .../engine/storagegroup/StorageGroupProcessor.java | 17 +++++++++++++---
 .../org/apache/iotdb/db/metadata/MManager.java     |  2 +-
 .../java/org/apache/iotdb/db/metadata/MTree.java   |  3 +++
 .../db/sync/receiver/transfer/SyncServiceImpl.java | 16 ++++++++++++++-
 .../db/sync/sender/manage/SyncFileManager.java     | 10 ----------
 .../iotdb/db/sync/sender/transfer/SyncClient.java  |  9 +++++++--
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |  1 +
 .../org/apache/iotdb/db/metadata/MTreeTest.java    |  2 +-
 service-rpc/src/main/thrift/sync.thrift            | 23 +++++++++++++++++-----
 9 files changed, 60 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 674c0fd..6642ae1 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1537,8 +1537,9 @@ public class StorageGroupProcessor {
     writeLock();
     mergeLock.writeLock().lock();
     try {
-      loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource);
-      updateLatestTimeMap(newTsFileResource);
+      if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource)){
+        updateLatestTimeMap(newTsFileResource);
+      }
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -1780,8 +1781,9 @@ public class StorageGroupProcessor {
    * @param type load type
    * @param tsFileResource tsfile resource to be loaded
    * @UsedBy sync module, load external tsfile module.
+   * @return load the file successfully
    */
-  private void loadTsFileByType(LoadTsFileType type, File syncedTsFile,
+  private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
       TsFileResource tsFileResource)
       throws TsFileProcessorException, DiskSpaceInsufficientException {
     File targetFile;
@@ -1793,6 +1795,10 @@ public class StorageGroupProcessor {
             storageGroupName + File.separatorChar + timeRangeId + File.separator + tsFileResource
                 .getFile().getName());
         tsFileResource.setFile(targetFile);
+        if(unSequenceFileList.contains(tsFileResource)){
+          logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
+          return false;
+        }
         unSequenceFileList.add(tsFileResource);
         logger.info("Load tsfile in unsequence list, move file from {} to {}",
             syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
@@ -1803,6 +1809,10 @@ public class StorageGroupProcessor {
                 storageGroupName + File.separatorChar + timeRangeId + File.separator
                     + tsFileResource.getFile().getName());
         tsFileResource.setFile(targetFile);
+        if(sequenceFileTreeSet.contains(tsFileResource)){
+          logger.error("The file {} has already been loaded in sequence list", tsFileResource);
+          return false;
+        }
         sequenceFileTreeSet.add(tsFileResource);
         logger.info("Load tsfile in sequence list, move file from {} to {}",
             syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
@@ -1840,6 +1850,7 @@ public class StorageGroupProcessor {
           syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(),
           e.getMessage()));
     }
+    return true;
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index afd72f3..94a382f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -453,13 +453,13 @@ public class MManager {
   public void setStorageGroup(String storageGroup) throws MetadataException {
     lock.writeLock().lock();
     try {
+      mtree.setStorageGroup(storageGroup);
       if (writeToLog) {
         BufferedWriter writer = getLogWriter();
         writer.write(MetadataOperationType.SET_STORAGE_GROUP + "," + storageGroup);
         writer.newLine();
         writer.flush();
       }
-      mtree.setStorageGroup(storageGroup);
       IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
       ActiveTimeSeriesCounter.getInstance().init(storageGroup);
       seriesNumberInStorageGroups.put(storageGroup, 0);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 4869b70..896b821 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -102,6 +102,9 @@ public class MTree implements Serializable {
     }
     MNode leaf = new LeafMNode(cur, nodeNames[nodeNames.length - 1], dataType, encoding,
         compressor, props);
+    if (cur.hasChild(leaf.getName())) {
+      throw new MetadataException(String.format("The timeseries %s has already existed.", path));
+    }
     cur.addChild(leaf);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 6096a53..169a689 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -29,6 +29,7 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -45,6 +46,7 @@ import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
 import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogger;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.service.sync.thrift.SyncStatus;
 import org.apache.thrift.TException;
@@ -75,8 +77,20 @@ public class SyncServiceImpl implements SyncService.Iface {
    * Verify IP address of sender
    */
   @Override
-  public SyncStatus check(String ipAddress, String uuid) {
+  public SyncStatus check(ConfirmInfo info) {
+    String ipAddress = info.address, uuid = info.uuid;
     Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
+    if (!info.version.equals(IoTDBConstant.VERSION)) {
+      return getErrorResult(String.format("Version mismatch: the sender <%s>, the receiver <%s>",
+          info.version, IoTDBConstant.VERSION));
+    }
+    if (info.partitionInterval != IoTDBDescriptor.getInstance().getConfig()
+        .getPartitionInterval()) {
+      return getErrorResult(String
+          .format("Partition interval mismatch: the sender <%d>, the receiver <%d>",
+              info.partitionInterval,
+              IoTDBDescriptor.getInstance().getConfig().getPartitionInterval()));
+    }
     if (SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress)) {
       senderName.set(ipAddress + SyncConstant.SYNC_DIR_NAME_SEPARATOR + uuid);
       if (checkRecovery()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
index 6c4c569..d6e360e 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -102,16 +102,6 @@ public class SyncFileManager implements ISyncFileManager {
           .equals(TsFileConstant.PATH_UPGRADE)) {
         continue;
       }
-      try {
-        if (!MManager.getInstance().getStorageGroupName(sgFolder.getName())
-            .equals(sgFolder.getName())) {
-          // the folder is not a sg folder
-          continue;
-        }
-      } catch (MetadataException e) {
-        // the folder is not a sg folder
-        continue;
-      }
       allSGs.putIfAbsent(sgFolder.getName(), new HashSet<>());
       currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashMap<>());
       for (File timeRangeFolder : sgFolder.listFiles()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 653c6c2..e627e43 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.SyncConnectionException;
@@ -67,6 +68,7 @@ import org.apache.iotdb.db.sync.sender.recover.ISyncSenderLogger;
 import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
 import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
 import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.service.sync.thrift.SyncStatus;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
@@ -286,9 +288,12 @@ public class SyncClient implements ISyncClient {
 
   @Override
   public void confirmIdentity() throws SyncConnectionException {
-    try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())){
+    try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())) {
+      ConfirmInfo info = new ConfirmInfo(socket.getLocalAddress().getHostAddress(),
+          getOrCreateUUID(getUuidFile()),
+          IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(), IoTDBConstant.VERSION);
       SyncStatus status = serviceClient
-          .check(socket.getLocalAddress().getHostAddress(), getOrCreateUUID(getUuidFile()));
+          .check(info);
       if (status.code != SUCCESS_CODE) {
         throw new SyncConnectionException(
             "The receiver rejected the synchronization task because " + status.msg);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 765c17a..64f04c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -55,6 +55,7 @@ public class FileLoaderUtils {
     } else {
       tsFileResource.deserialize();
     }
+    tsFileResource.setClosed(true);
   }
 
   public static void updateTsFileResource(TsFileMetaData metaData, TsFileSequenceReader reader,
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java
index 6cd9783..3f6d67e 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java
@@ -296,7 +296,7 @@ public class MTreeTest {
       root.setStorageGroup("root.laptop.d2");
       root.createTimeseries("root.laptop.d1.s1", TSDataType.INT32, TSEncoding.PLAIN,
           CompressionType.GZIP, null);
-      root.createTimeseries("root.laptop.d1.s1", TSDataType.INT32, TSEncoding.PLAIN,
+      root.createTimeseries("root.laptop.d1.s2", TSDataType.INT32, TSEncoding.PLAIN,
           CompressionType.GZIP, null);
 
       List<String> list = new ArrayList<>();
diff --git a/service-rpc/src/main/thrift/sync.thrift b/service-rpc/src/main/thrift/sync.thrift
index 9b20673..3e243f4 100755
--- a/service-rpc/src/main/thrift/sync.thrift
+++ b/service-rpc/src/main/thrift/sync.thrift
@@ -18,15 +18,28 @@
  */
 namespace java org.apache.iotdb.service.sync.thrift
 
-typedef i32 int 
-
 struct SyncStatus{
-  required i32 code
-  required string msg
+  1:required i32 code
+  2:required string msg
+}
+
+// The sender and receiver need to check some info to confirm validity
+struct ConfirmInfo{
+  // check whether the ip of sender is in thw white list of receiver.
+  1:string address
+
+  // Sender needs to tell receiver its identity.
+  2:string uuid
+
+  // The partition interval of sender and receiver need to be the same.
+  3:i64 partitionInterval
+
+  // The version of sender and receiver need to be the same.
+  4:string version
 }
 
 service SyncService{
-	SyncStatus check(1:string address, 2:string uuid)
+	SyncStatus check(ConfirmInfo info)
 	SyncStatus startSync();
 	SyncStatus init(1:string storageGroupName)
 	SyncStatus syncDeletedFileName(1:string fileName)