You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/09/04 03:27:25 UTC

[incubator-iotdb] branch reimpl_sync updated (2418388 -> 0abcba8)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 2418388  Merge branch 'master' into reimpl_sync
     new ae465bb  remove useless file
     new d287ba1  fix some acute bug in testing
     new 0abcba8  fix a bug in windows environment

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 server/0seq-0-0.tsfile.merge                       | Bin 119406 -> 0 bytes
 server/1seq-1-1.tsfile.merge                       | Bin 119406 -> 0 bytes
 server/2seq-2-2.tsfile.merge                       | Bin 119406 -> 0 bytes
 server/3seq-3-3.tsfile.merge                       | Bin 119406 -> 0 bytes
 server/4seq-4-4.tsfile.merge                       | Bin 120297 -> 0 bytes
 .../receiver/recover/SyncReceiverLogAnalyzer.java  |   2 ++
 .../db/sync/receiver/transfer/SyncServiceImpl.java |  34 +++++++++++++-------
 .../db/sync/sender/conf/SyncSenderConfig.java      |   5 +--
 .../db/sync/sender/recover/SyncSenderLogger.java   |   5 ++-
 .../sync/sender/transfer/DataTransferManager.java  |  35 ++++++++++++---------
 .../sync/sender/recover/SyncSenderLoggerTest.java  |   1 +
 11 files changed, 53 insertions(+), 29 deletions(-)
 delete mode 100644 server/0seq-0-0.tsfile.merge
 delete mode 100644 server/1seq-1-1.tsfile.merge
 delete mode 100644 server/2seq-2-2.tsfile.merge
 delete mode 100644 server/3seq-3-3.tsfile.merge
 delete mode 100644 server/4seq-4-4.tsfile.merge


[incubator-iotdb] 03/03: fix a bug in windows environment

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 0abcba81e03334a761621542c4676a8d7607ec17
Author: lta <li...@163.com>
AuthorDate: Wed Sep 4 11:22:35 2019 +0800

    fix a bug in windows environment
---
 .../apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java  | 9 ++++++---
 .../apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java    | 5 ++++-
 .../iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java       | 1 +
 3 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index a83a245..ae6b474 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.sync.receiver.transfer;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.math.BigInteger;
@@ -216,7 +215,9 @@ public class SyncServiceImpl implements SyncService.Iface {
   public ResultStatus checkDataMD5(String md5OfSender) throws TException {
     String md5OfReceiver = (new BigInteger(1, messageDigest.get().digest())).toString(16);
     try {
-      currentFileWriter.get().close();
+      if (currentFileWriter.get() != null && currentFileWriter.get().isOpen()) {
+        currentFileWriter.get().close();
+      }
       if (!md5OfSender.equals(md5OfReceiver)) {
         currentFile.get().delete();
         currentFileWriter.set(new FileOutputStream(currentFile.get()).getChannel());
@@ -312,7 +313,9 @@ public class SyncServiceImpl implements SyncService.Iface {
   @Override
   public ResultStatus endSync() throws TException {
     try {
-      syncLog.get().close();
+      if (syncLog.get() != null) {
+        syncLog.get().close();
+      }
       FileLoaderManager.getInstance().getFileLoader(senderName.get()).endSync();
       logger.info("Sync process for data of storage group {} ended.", currentSG.get());
     } catch (IOException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
index 0b78d3b..c6b5360 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
@@ -66,6 +66,9 @@ public class SyncSenderLogger implements ISyncSenderLogger {
 
   @Override
   public void close() throws IOException {
-    bw.close();
+    if(bw != null) {
+      bw.close();
+      bw = null;
+    }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java
index 0cb1205..aaddaa6 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java
@@ -78,6 +78,7 @@ public class SyncSenderLoggerTest {
       toBeSyncedFiles
           .add(new File(config.getSenderFolderPath(), "new" + i).getAbsolutePath());
     }
+    senderLogger.close();
     int count = 0;
     int mode = 0;
     try (BufferedReader br = new BufferedReader(


[incubator-iotdb] 01/03: remove useless file

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit ae465bba393655e5304fc01136c58b992ac01b51
Author: lta <li...@163.com>
AuthorDate: Wed Sep 4 09:27:23 2019 +0800

    remove useless file
---
 server/0seq-0-0.tsfile.merge | Bin 119406 -> 0 bytes
 server/1seq-1-1.tsfile.merge | Bin 119406 -> 0 bytes
 server/2seq-2-2.tsfile.merge | Bin 119406 -> 0 bytes
 server/3seq-3-3.tsfile.merge | Bin 119406 -> 0 bytes
 server/4seq-4-4.tsfile.merge | Bin 120297 -> 0 bytes
 5 files changed, 0 insertions(+), 0 deletions(-)

diff --git a/server/0seq-0-0.tsfile.merge b/server/0seq-0-0.tsfile.merge
deleted file mode 100644
index d1762fe..0000000
Binary files a/server/0seq-0-0.tsfile.merge and /dev/null differ
diff --git a/server/1seq-1-1.tsfile.merge b/server/1seq-1-1.tsfile.merge
deleted file mode 100644
index 035db68..0000000
Binary files a/server/1seq-1-1.tsfile.merge and /dev/null differ
diff --git a/server/2seq-2-2.tsfile.merge b/server/2seq-2-2.tsfile.merge
deleted file mode 100644
index def7537..0000000
Binary files a/server/2seq-2-2.tsfile.merge and /dev/null differ
diff --git a/server/3seq-3-3.tsfile.merge b/server/3seq-3-3.tsfile.merge
deleted file mode 100644
index 4336151..0000000
Binary files a/server/3seq-3-3.tsfile.merge and /dev/null differ
diff --git a/server/4seq-4-4.tsfile.merge b/server/4seq-4-4.tsfile.merge
deleted file mode 100644
index 27c316b..0000000
Binary files a/server/4seq-4-4.tsfile.merge and /dev/null differ


[incubator-iotdb] 02/03: fix some acute bug in testing

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit d287ba13fefa56a1909e96362def3fbb52bf8329
Author: lta <li...@163.com>
AuthorDate: Wed Sep 4 11:14:37 2019 +0800

    fix some acute bug in testing
---
 .../receiver/recover/SyncReceiverLogAnalyzer.java  |  2 ++
 .../db/sync/receiver/transfer/SyncServiceImpl.java | 25 ++++++++++------
 .../db/sync/sender/conf/SyncSenderConfig.java      |  5 ++--
 .../sync/sender/transfer/DataTransferManager.java  | 35 +++++++++++++---------
 4 files changed, 42 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
index 375c8a1..b4899ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
@@ -22,6 +22,7 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.sync.receiver.load.FileLoader;
 import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
@@ -65,6 +66,7 @@ public class SyncReceiverLogAnalyzer implements ISyncReceiverLogAnalyzer {
     // check the state
     if (!new File(senderFolder, SyncConstant.SYNC_LOG_NAME).exists()) {
       new File(senderFolder, SyncConstant.LOAD_LOG_NAME).delete();
+      FileUtils.deleteDirectory(new File(senderFolder, SyncConstant.RECEIVER_DATA_FOLDER_NAME));
       return true;
     }
     if (FileLoaderManager.getInstance().containsFileLoader(senderFolder.getName())) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 8a6064a..a83a245 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -27,10 +27,10 @@ import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -103,7 +103,9 @@ public class SyncServiceImpl implements SyncService.Iface {
       if (currentFileWriter.get() != null && currentFileWriter.get().isOpen()) {
         currentFileWriter.get().close();
       }
-      syncLog.get().close();
+      if (syncLog.get() != null) {
+        syncLog.get().close();
+      }
       return SyncReceiverLogAnalyzer.getInstance().recover(senderName.get());
     } catch (IOException e) {
       logger.error("Check recovery state fail", e);
@@ -117,7 +119,7 @@ public class SyncServiceImpl implements SyncService.Iface {
       initPath();
       currentSG.remove();
       FileLoader.createFileLoader(senderName.get(), syncFolderPath.get());
-      syncLog.set(new SyncReceiverLogger(new File(getSyncDataPath(), SyncConstant.SYNC_LOG_NAME)));
+      syncLog.set(new SyncReceiverLogger(new File(syncFolderPath.get(), SyncConstant.SYNC_LOG_NAME)));
       return getSuccessResult();
     } catch (DiskSpaceInsufficientException | IOException e) {
       logger.error("Can not receiver data from sender", e);
@@ -175,6 +177,7 @@ public class SyncServiceImpl implements SyncService.Iface {
       } else {
         file = new File(getSyncDataPath(), currentSG.get() + File.separatorChar + filename);
       }
+      file.delete();
       currentFile.set(file);
       if (!file.getParentFile().exists()) {
         file.getParentFile().mkdirs();
@@ -184,7 +187,8 @@ public class SyncServiceImpl implements SyncService.Iface {
       }
       currentFileWriter.set(new FileOutputStream(file).getChannel());
       syncLog.get().startSyncTsFiles();
-    } catch (IOException e) {
+      messageDigest.set(MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME));
+    } catch (IOException | NoSuchAlgorithmException e) {
       logger.error("Can not init sync resource for file {}", filename, e);
       return getErrorResult(
           String.format("Can not init sync resource for file %s because %s", filename,
@@ -197,6 +201,7 @@ public class SyncServiceImpl implements SyncService.Iface {
   public ResultStatus syncData(ByteBuffer buff) {
     try {
       currentFileWriter.get().write(buff);
+      buff.flip();
       messageDigest.get().update(buff);
     } catch (IOException e) {
       logger.error("Can not sync data for file {}", currentFile.get().getAbsoluteFile(), e);
@@ -213,7 +218,7 @@ public class SyncServiceImpl implements SyncService.Iface {
     try {
       currentFileWriter.get().close();
       if (!md5OfSender.equals(md5OfReceiver)) {
-        FileUtils.forceDelete(currentFile.get());
+        currentFile.get().delete();
         currentFileWriter.set(new FileOutputStream(currentFile.get()).getChannel());
       } else {
         if (currentFile.get().getName().endsWith(MetadataConstant.METADATA_LOG)) {
@@ -241,13 +246,15 @@ public class SyncServiceImpl implements SyncService.Iface {
           new java.io.FileReader(currentFile.get()))) {
         String metadataOperation;
         while ((metadataOperation = br.readLine()) != null) {
-          operation(metadataOperation);
+          try {
+            operation(metadataOperation);
+          } catch (IOException | MetadataErrorException | PathErrorException e) {
+            // multiple insert schema, ignore it.
+          }
         }
-      } catch (FileNotFoundException e) {
+      } catch (IOException e) {
         logger.error("Cannot read the file {}.", currentFile.get().getAbsoluteFile(), e);
         return false;
-      } catch (IOException | MetadataErrorException | PathErrorException e) {
-        // multiple insert schema, ignore it.
       }
     }
     return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
index cc6f11c..3216558 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.sync.sender.conf;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.List;
 
 public class SyncSenderConfig {
@@ -27,7 +28,7 @@ public class SyncSenderConfig {
 
   private int serverPort = 5555;
 
-  private int syncPeriodInSecond = 10;
+  private int syncPeriodInSecond = 600;
 
   private String senderFolderPath;
 
@@ -42,7 +43,7 @@ public class SyncSenderConfig {
   /**
    * Storage groups which participate in sync process
    */
-  private List<String> storageGroupList;
+  private List<String> storageGroupList = new ArrayList<>();
 
   /**
    * Update paths based on data directory
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index 0952fe5..3d550a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -132,16 +132,20 @@ public class DataTransferManager implements IDataTransferManager {
    * running.
    */
   private void verifySingleton() throws IOException {
-    File lockFile = new File(config.getLockFilePath());
-    if (!lockFile.getParentFile().exists()) {
-      lockFile.getParentFile().mkdirs();
-    }
-    if (!lockFile.exists()) {
-      lockFile.createNewFile();
-    }
-    if (!lockInstance(config.getLockFilePath())) {
-      logger.error("Sync client is already running.");
-      System.exit(1);
+    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    for (String dataDir : dataDirs) {
+      config.update(dataDir);
+      File lockFile = new File(config.getLockFilePath());
+      if (!lockFile.getParentFile().exists()) {
+        lockFile.getParentFile().mkdirs();
+      }
+      if (!lockFile.exists()) {
+        lockFile.createNewFile();
+      }
+      if (!lockInstance(config.getLockFilePath())) {
+        logger.error("Sync client is already running.");
+        System.exit(1);
+      }
     }
   }
 
@@ -343,8 +347,9 @@ public class DataTransferManager implements IDataTransferManager {
     try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
         ByteArrayOutputStream bos = new ByteArrayOutputStream(SyncConstant.DATA_CHUNK_SIZE)) {
       schemaFileLinePos = 0;
-      while (schemaFileLinePos++ <= schemaPos) {
+      while (schemaFileLinePos < schemaPos) {
         br.readLine();
+        schemaFileLinePos++;
       }
       MessageDigest md = MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME);
       String line;
@@ -353,8 +358,9 @@ public class DataTransferManager implements IDataTransferManager {
         schemaFileLinePos++;
         byte[] singleLineData = BytesUtils.stringToBytes(line);
         bos.write(singleLineData);
-        md.update(singleLineData);
+        bos.write("\r\n".getBytes());
         if (cntLine++ == BATCH_LINE) {
+          md.update(bos.toByteArray());
           ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
           bos.reset();
           ResultStatus status = serviceClient.syncData(buffToSend);
@@ -366,6 +372,7 @@ public class DataTransferManager implements IDataTransferManager {
         }
       }
       if (bos.size() != 0) {
+        md.update(bos.toByteArray());
         ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
         bos.reset();
         ResultStatus status = serviceClient.syncData(buffToSend);
@@ -376,7 +383,7 @@ public class DataTransferManager implements IDataTransferManager {
       }
 
       // check md5
-      return checkMD5ForSchema((new BigInteger(1, md.digest())).toString(16));
+      return checkMD5ForSchema(new BigInteger(1, md.digest()).toString(16));
     } catch (NoSuchAlgorithmException | IOException | TException e) {
       logger.error("Can not finish transfer schema to receiver", e);
       return false;
@@ -497,7 +504,7 @@ public class DataTransferManager implements IDataTransferManager {
       try {
         File snapshotFile = makeFileSnapshot(tsfile);
         // firstly sync .restore file, then sync tsfile
-        syncSingleFile(new File(snapshotFile, TsFileResource.RESOURCE_SUFFIX));
+        syncSingleFile(new File(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
         syncSingleFile(snapshotFile);
         lastLocalFilesMap.get(sgName).add(tsfile);
         syncLog.finishSyncTsfile(tsfile);