You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2020/12/22 16:31:06 UTC

[iotdb] 01/01: fix sync schema bug

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch fix_sync_schema_bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 128e1914d6d259db1c5504688ba84c05c07ddcde
Author: lta <li...@163.com>
AuthorDate: Wed Dec 23 00:30:16 2020 +0800

    fix sync schema bug
---
 .../db/sync/receiver/transfer/SyncServiceImpl.java |  5 ++-
 .../iotdb/db/sync/sender/transfer/SyncClient.java  | 45 ++++++----------------
 2 files changed, 14 insertions(+), 36 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 7d8df48..3572bac 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -270,10 +270,11 @@ public class SyncServiceImpl implements SyncService.Iface {
   private void loadMetadata() {
     logger.info("Start to load metadata in sync process.");
     if (currentFile.get().exists()) {
-      try (MLogReader mLogReader = new MLogReader(config.getSchemaDir(), MetadataConstant.METADATA_LOG)) {
+      try (MLogReader mLogReader = new MLogReader(currentFile.get())) {
         while (mLogReader.hasNext()) {
-          PhysicalPlan plan = mLogReader.next();
+          PhysicalPlan plan = null;
           try {
+            plan = mLogReader.next();
             if (plan == null) {
               continue;
             }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 838d74e..5e347f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -72,7 +72,6 @@ import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.service.sync.thrift.SyncStatus;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -89,8 +88,6 @@ public class SyncClient implements ISyncClient {
 
   private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
 
-  private static final int BATCH_LINE = 1000;
-
   private static final int TIMEOUT_MS = 1000;
 
   /**
@@ -99,7 +96,7 @@ public class SyncClient implements ISyncClient {
    * location is recorded once after each synchronization task for the next synchronization task to
    * use.
    */
-  private int schemaFileLinePos;
+  private int schemaFilePos;
 
   private TTransport transport;
 
@@ -365,45 +362,25 @@ public class SyncClient implements ISyncClient {
   }
 
   private boolean tryToSyncSchema() {
-    int schemaPos = readSyncSchemaPos(getSchemaPosFile());
+    schemaFilePos = readSyncSchemaPos(getSchemaPosFile());
 
     // start to sync file data and get digest of this file.
-    try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
+    try (FileInputStream fis = new FileInputStream(getSchemaLogFile());
         ByteArrayOutputStream bos = new ByteArrayOutputStream(SyncConstant.DATA_CHUNK_SIZE)) {
-      schemaFileLinePos = 0;
-      String line;
-      while (schemaFileLinePos < schemaPos) {
-        line = br.readLine();
-        schemaFileLinePos++;
-      }
+      fis.skip(schemaFilePos);
       MessageDigest md = MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME);
-      int cntLine = 0;
-      while ((line = br.readLine()) != null) {
-        schemaFileLinePos++;
-        byte[] singleLineData = BytesUtils.stringToBytes(line);
-        bos.write(singleLineData);
-        bos.write("\r\n".getBytes());
-        if (cntLine++ == BATCH_LINE) {
-          md.update(bos.toByteArray());
-          ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
-          bos.reset();
-          SyncStatus status = serviceClient.syncData(buffToSend);
-          if (status.code != SUCCESS_CODE) {
-            logger.error("Receiver failed to receive metadata because {}, retry.", status.msg);
-            return false;
-          }
-          cntLine = 0;
-        }
-      }
-      if (bos.size() != 0) {
-        md.update(bos.toByteArray());
+      byte[] buffer = new byte[SyncConstant.DATA_CHUNK_SIZE];
+      int dataLength;
+      while ((dataLength = fis.read(buffer)) != -1) {
+        bos.write(buffer, 0, dataLength);
+        md.update(buffer, 0, dataLength);
         ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
-        bos.reset();
         SyncStatus status = serviceClient.syncData(buffToSend);
         if (status.code != SUCCESS_CODE) {
           logger.error("Receiver failed to receive metadata because {}, retry.", status.msg);
           return false;
         }
+        schemaFilePos += dataLength;
       }
 
       // check digest
@@ -453,7 +430,7 @@ public class SyncClient implements ISyncClient {
         syncSchemaLogFile.createNewFile();
       }
       try (BufferedWriter br = new BufferedWriter(new FileWriter(syncSchemaLogFile))) {
-        br.write(Integer.toString(schemaFileLinePos));
+        br.write(Integer.toString(schemaFilePos));
       }
     } catch (IOException e) {
       logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);