You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/08/24 02:09:51 UTC

[GitHub] [iotdb] Cpaulyz commented on a diff in pull request #7038: [IOTDB-3950] Transport layer optimization for sync modules

Cpaulyz commented on code in PR #7038:
URL: https://github.com/apache/iotdb/pull/7038#discussion_r953256468


##########
server/src/main/java/org/apache/iotdb/db/sync/transport/client/ITransportClient.java:
##########
@@ -19,4 +19,25 @@
  */
 package org.apache.iotdb.db.sync.transport.client;
 
-public interface ITransportClient extends Runnable {}
+import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.sync.pipedata.PipeData;
+
+public interface ITransportClient {
+  /**
+   * Create connection and handshake before sending messages
+   *
+   * @return true if success; false if failed to check IoTDB version.
+   * @throws SyncConnectionException cannot create connection to receiver
+   */
+  boolean handshake() throws SyncConnectionException;
+
+  /**
+   * Send {@link PipeData} to receiver and load.
+   *
+   * @return true if success; false if failed to send or load.
+   * @throws SyncConnectionException cannot create connection to receiver
+   */
+  boolean sendTransport(PipeData pipeData) throws SyncConnectionException;

Review Comment:
   rename to ` send(PipeData pipeData)` Because implementation of this method may call sendFile and sendPipeData



##########
server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java:
##########
@@ -218,157 +203,115 @@ private boolean verifyIP(String ipSegment, String ipAddress, int subnetMark) {
     return ipAddressBinary.equals(ipSegmentBinary);
   }
 
-  public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest)
-      throws TException {
+  /**
+   * Receive {@link PipeData} and load it into IoTDB Engine.
+   *
+   * @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to receive or load; {@link
+   *     TSStatusCode#SUCCESS_STATUS} if load successfully.
+   * @throws TException The connection between the sender and the receiver has not been established
+   *     by {@link ReceiverManager#handshake(TSyncIdentityInfo)}
+   */
+  public TSStatus transportPipeData(ByteBuffer buff) throws TException {
+    // step1. check connection
     TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
     if (identityInfo == null) {
       throw new TException("Thrift connection is not alive.");
     }
-    logger.debug("Invoke transportData method from client ip = {}", identityInfo.address);
-
+    logger.debug("Invoke transportPipeData method from client ip = {}", identityInfo.address);
     String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
-    TSyncTransportType type = metaInfo.type;
-    String fileName = metaInfo.fileName;
-    long startIndex = metaInfo.startIndex;
 
-    // Check file start index valid
-    if (type == TSyncTransportType.FILE) {
-      try {
-        CheckResult result = checkStartIndexValid(new File(fileDir, fileName), startIndex);
-        if (!result.isResult()) {
-          return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REBASE, result.getIndex());
-        }
-      } catch (IOException e) {
-        logger.error(e.getMessage());
-        return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
-      }
-    }
-
-    // Check buff digest
-    int pos = buff.position();
-    MessageDigest messageDigest = null;
+    // step2. deserialize PipeData
+    PipeData pipeData;
     try {
-      messageDigest = MessageDigest.getInstance("SHA-256");
-    } catch (NoSuchAlgorithmException e) {
-      logger.error(e.getMessage());
-      return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
-    }
-    messageDigest.update(buff);
-    byte[] digestBytes = new byte[digest.capacity()];
-    digest.get(digestBytes);
-    if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
-      return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_RETRY, "Data digest check error");
-    }
-
-    if (type != TSyncTransportType.FILE) {
-      buff.position(pos);
       int length = buff.capacity();
       byte[] byteArray = new byte[length];
       buff.get(byteArray);
-      try {
-        PipeData pipeData = PipeData.createPipeData(byteArray);
-        if (type == TSyncTransportType.TSFILE) {
-          // Do with file
-          handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
-        }
-        logger.info(
-            "Start load pipeData with serialize number {} and type {},value={}",
-            pipeData.getSerialNumber(),
-            pipeData.getType(),
-            pipeData);
-        pipeData.createLoader().load();
-        logger.info(
-            "Load pipeData with serialize number {} successfully.", pipeData.getSerialNumber());
-      } catch (IOException | IllegalPathException e) {
-        logger.error("Pipe data transport error, {}", e.getMessage());
-        return RpcUtils.getStatus(
-            TSStatusCode.SYNC_FILE_RETRY, "Data digest transport error " + e.getMessage());
-      } catch (PipeDataLoadException e) {
-        logger.error("Fail to load pipeData because {}.", e.getMessage());
-        return RpcUtils.getStatus(
-            TSStatusCode.SYNC_FILE_ERROR, "Fail to load pipeData because " + e.getMessage());
-      }
-    } else {
-      // Write buff to {file}.patch
-      buff.position(pos);
-      File file = new File(fileDir, fileName + PATCH_SUFFIX);
-      try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw")) {
-        randomAccessFile.seek(startIndex);
-        int length = buff.capacity();
-        byte[] byteArray = new byte[length];
-        buff.get(byteArray);
-        randomAccessFile.write(byteArray);
-        writeRecordFile(new File(fileDir, fileName + RECORD_SUFFIX), startIndex + length);
-        logger.debug(
-            "Sync "
-                + fileName
-                + " start at "
-                + startIndex
-                + " to "
-                + (startIndex + length)
-                + " is done.");
-      } catch (IOException e) {
-        logger.error(e.getMessage());
-        return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
+      pipeData = PipeData.createPipeData(byteArray);
+      if (pipeData instanceof TsFilePipeData) {
+        handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
       }
+    } catch (IOException | IllegalPathException e) {
+      logger.error("Pipe data transport error, {}", e.getMessage());
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPESERVER_ERROR, "Pipe data transport error, " + e.getMessage());
     }
+
+    // step3. load PipeData
+    logger.info(
+        "Start load pipeData with serialize number {} and type {},value={}",
+        pipeData.getSerialNumber(),
+        pipeData.getType(),
+        pipeData);
+    try {
+      pipeData.createLoader().load();
+      logger.info(
+          "Load pipeData with serialize number {} successfully.", pipeData.getSerialNumber());
+    } catch (PipeDataLoadException e) {
+      logger.error("Fail to load pipeData because {}.", e.getMessage());
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPESERVER_ERROR, "Fail to load pipeData because " + e.getMessage());
+    }
+
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
 
-  public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer digest)
+  /**
+   * Receive TsFile based on startIndex.
+   *
+   * @return {@link TSStatusCode#SUCCESS_STATUS} if receive successfully; {@link
+   *     TSStatusCode#SYNC_FILE_REBASE} if startIndex needs to rollback because mismatched; {@link
+   *     TSStatusCode#SYNC_FILE_ERROR} if fail to receive file.
+   * @throws TException The connection between the sender and the receiver has not been established
+   *     by {@link ReceiverManager#handshake(TSyncIdentityInfo)}
+   */
+  public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)
       throws TException {
+    // step1. check connection
     TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
     if (identityInfo == null) {
       throw new TException("Thrift connection is not alive.");
     }
-    logger.debug("Invoke checkFileDigest method from client ip = {}", identityInfo.address);
+    logger.debug("Invoke transportData method from client ip = {}", identityInfo.address);
+
     String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
-    synchronized (fileDir.intern()) {
-      String fileName = metaInfo.fileName;
-      MessageDigest messageDigest = null;
-      try {
-        messageDigest = MessageDigest.getInstance("SHA-256");
-      } catch (NoSuchAlgorithmException e) {
-        logger.error(e.getMessage());
-        return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, e.getMessage());
-      }
+    //    TSyncTransportType type = metaInfo.type;

Review Comment:
   done



##########
server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java:
##########
@@ -427,6 +366,22 @@ private TSyncIdentityInfo getCurrentTSyncIdentityInfo() {
     }
   }
 
+  /**
+   * Get current TSyncIdentityInfo
+   *
+   * @return -1

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org