You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/09/04 03:27:27 UTC

[incubator-iotdb] 02/03: fix some acute bug in testing

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit d287ba13fefa56a1909e96362def3fbb52bf8329
Author: lta <li...@163.com>
AuthorDate: Wed Sep 4 11:14:37 2019 +0800

    fix some acute bug in testing
---
 .../receiver/recover/SyncReceiverLogAnalyzer.java  |  2 ++
 .../db/sync/receiver/transfer/SyncServiceImpl.java | 25 ++++++++++------
 .../db/sync/sender/conf/SyncSenderConfig.java      |  5 ++--
 .../sync/sender/transfer/DataTransferManager.java  | 35 +++++++++++++---------
 4 files changed, 42 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
index 375c8a1..b4899ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
@@ -22,6 +22,7 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.sync.receiver.load.FileLoader;
 import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
@@ -65,6 +66,7 @@ public class SyncReceiverLogAnalyzer implements ISyncReceiverLogAnalyzer {
     // check the state
     if (!new File(senderFolder, SyncConstant.SYNC_LOG_NAME).exists()) {
       new File(senderFolder, SyncConstant.LOAD_LOG_NAME).delete();
+      FileUtils.deleteDirectory(new File(senderFolder, SyncConstant.RECEIVER_DATA_FOLDER_NAME));
       return true;
     }
     if (FileLoaderManager.getInstance().containsFileLoader(senderFolder.getName())) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 8a6064a..a83a245 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -27,10 +27,10 @@ import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -103,7 +103,9 @@ public class SyncServiceImpl implements SyncService.Iface {
       if (currentFileWriter.get() != null && currentFileWriter.get().isOpen()) {
         currentFileWriter.get().close();
       }
-      syncLog.get().close();
+      if (syncLog.get() != null) {
+        syncLog.get().close();
+      }
       return SyncReceiverLogAnalyzer.getInstance().recover(senderName.get());
     } catch (IOException e) {
       logger.error("Check recovery state fail", e);
@@ -117,7 +119,7 @@ public class SyncServiceImpl implements SyncService.Iface {
       initPath();
       currentSG.remove();
       FileLoader.createFileLoader(senderName.get(), syncFolderPath.get());
-      syncLog.set(new SyncReceiverLogger(new File(getSyncDataPath(), SyncConstant.SYNC_LOG_NAME)));
+      syncLog.set(new SyncReceiverLogger(new File(syncFolderPath.get(), SyncConstant.SYNC_LOG_NAME)));
       return getSuccessResult();
     } catch (DiskSpaceInsufficientException | IOException e) {
       logger.error("Can not receiver data from sender", e);
@@ -175,6 +177,7 @@ public class SyncServiceImpl implements SyncService.Iface {
       } else {
         file = new File(getSyncDataPath(), currentSG.get() + File.separatorChar + filename);
       }
+      file.delete();
       currentFile.set(file);
       if (!file.getParentFile().exists()) {
         file.getParentFile().mkdirs();
@@ -184,7 +187,8 @@ public class SyncServiceImpl implements SyncService.Iface {
       }
       currentFileWriter.set(new FileOutputStream(file).getChannel());
       syncLog.get().startSyncTsFiles();
-    } catch (IOException e) {
+      messageDigest.set(MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME));
+    } catch (IOException | NoSuchAlgorithmException e) {
       logger.error("Can not init sync resource for file {}", filename, e);
       return getErrorResult(
           String.format("Can not init sync resource for file %s because %s", filename,
@@ -197,6 +201,7 @@ public class SyncServiceImpl implements SyncService.Iface {
   public ResultStatus syncData(ByteBuffer buff) {
     try {
       currentFileWriter.get().write(buff);
+      buff.flip();
       messageDigest.get().update(buff);
     } catch (IOException e) {
       logger.error("Can not sync data for file {}", currentFile.get().getAbsoluteFile(), e);
@@ -213,7 +218,7 @@ public class SyncServiceImpl implements SyncService.Iface {
     try {
       currentFileWriter.get().close();
       if (!md5OfSender.equals(md5OfReceiver)) {
-        FileUtils.forceDelete(currentFile.get());
+        currentFile.get().delete();
         currentFileWriter.set(new FileOutputStream(currentFile.get()).getChannel());
       } else {
         if (currentFile.get().getName().endsWith(MetadataConstant.METADATA_LOG)) {
@@ -241,13 +246,15 @@ public class SyncServiceImpl implements SyncService.Iface {
           new java.io.FileReader(currentFile.get()))) {
         String metadataOperation;
         while ((metadataOperation = br.readLine()) != null) {
-          operation(metadataOperation);
+          try {
+            operation(metadataOperation);
+          } catch (IOException | MetadataErrorException | PathErrorException e) {
+            // multiple insert schema, ignore it.
+          }
         }
-      } catch (FileNotFoundException e) {
+      } catch (IOException e) {
         logger.error("Cannot read the file {}.", currentFile.get().getAbsoluteFile(), e);
         return false;
-      } catch (IOException | MetadataErrorException | PathErrorException e) {
-        // multiple insert schema, ignore it.
       }
     }
     return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
index cc6f11c..3216558 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.sync.sender.conf;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.List;
 
 public class SyncSenderConfig {
@@ -27,7 +28,7 @@ public class SyncSenderConfig {
 
   private int serverPort = 5555;
 
-  private int syncPeriodInSecond = 10;
+  private int syncPeriodInSecond = 600;
 
   private String senderFolderPath;
 
@@ -42,7 +43,7 @@ public class SyncSenderConfig {
   /**
    * Storage groups which participate in sync process
    */
-  private List<String> storageGroupList;
+  private List<String> storageGroupList = new ArrayList<>();
 
   /**
    * Update paths based on data directory
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index 0952fe5..3d550a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -132,16 +132,20 @@ public class DataTransferManager implements IDataTransferManager {
    * running.
    */
   private void verifySingleton() throws IOException {
-    File lockFile = new File(config.getLockFilePath());
-    if (!lockFile.getParentFile().exists()) {
-      lockFile.getParentFile().mkdirs();
-    }
-    if (!lockFile.exists()) {
-      lockFile.createNewFile();
-    }
-    if (!lockInstance(config.getLockFilePath())) {
-      logger.error("Sync client is already running.");
-      System.exit(1);
+    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    for (String dataDir : dataDirs) {
+      config.update(dataDir);
+      File lockFile = new File(config.getLockFilePath());
+      if (!lockFile.getParentFile().exists()) {
+        lockFile.getParentFile().mkdirs();
+      }
+      if (!lockFile.exists()) {
+        lockFile.createNewFile();
+      }
+      if (!lockInstance(config.getLockFilePath())) {
+        logger.error("Sync client is already running.");
+        System.exit(1);
+      }
     }
   }
 
@@ -343,8 +347,9 @@ public class DataTransferManager implements IDataTransferManager {
     try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
         ByteArrayOutputStream bos = new ByteArrayOutputStream(SyncConstant.DATA_CHUNK_SIZE)) {
       schemaFileLinePos = 0;
-      while (schemaFileLinePos++ <= schemaPos) {
+      while (schemaFileLinePos < schemaPos) {
         br.readLine();
+        schemaFileLinePos++;
       }
       MessageDigest md = MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME);
       String line;
@@ -353,8 +358,9 @@ public class DataTransferManager implements IDataTransferManager {
         schemaFileLinePos++;
         byte[] singleLineData = BytesUtils.stringToBytes(line);
         bos.write(singleLineData);
-        md.update(singleLineData);
+        bos.write("\r\n".getBytes());
         if (cntLine++ == BATCH_LINE) {
+          md.update(bos.toByteArray());
           ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
           bos.reset();
           ResultStatus status = serviceClient.syncData(buffToSend);
@@ -366,6 +372,7 @@ public class DataTransferManager implements IDataTransferManager {
         }
       }
       if (bos.size() != 0) {
+        md.update(bos.toByteArray());
         ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
         bos.reset();
         ResultStatus status = serviceClient.syncData(buffToSend);
@@ -376,7 +383,7 @@ public class DataTransferManager implements IDataTransferManager {
       }
 
       // check md5
-      return checkMD5ForSchema((new BigInteger(1, md.digest())).toString(16));
+      return checkMD5ForSchema(new BigInteger(1, md.digest()).toString(16));
     } catch (NoSuchAlgorithmException | IOException | TException e) {
       logger.error("Can not finish transfer schema to receiver", e);
       return false;
@@ -497,7 +504,7 @@ public class DataTransferManager implements IDataTransferManager {
       try {
         File snapshotFile = makeFileSnapshot(tsfile);
         // firstly sync .restore file, then sync tsfile
-        syncSingleFile(new File(snapshotFile, TsFileResource.RESOURCE_SUFFIX));
+        syncSingleFile(new File(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
         syncSingleFile(snapshotFile);
         lastLocalFilesMap.get(sgName).add(tsfile);
         syncLog.finishSyncTsfile(tsfile);